Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20250912172758_v1_0_43.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
-- +goose Up
-- +goose StatementBegin
-- +goose no transaction
BEGIN;

CREATE TABLE v1_dag_to_task_partitioned (
dag_id BIGINT NOT NULL,
dag_inserted_at TIMESTAMPTZ NOT NULL,
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (dag_id, dag_inserted_at, task_id, task_inserted_at)
) PARTITION BY RANGE(dag_inserted_at);

DO $$
DECLARE
new_table_name TEXT;
BEGIN
-- We want to attach all of the existing data as today's partition, which
-- includes everything up until the _end_ of today.
-- The new partition will be named with today's date, but the end time will be tomorrow (midnight).
new_table_name := 'v1_dag_to_task_' || TO_CHAR(NOW()::DATE, 'YYYYMMDD');

RAISE NOTICE 'Renaming existing table to %', new_table_name;

EXECUTE format('ALTER TABLE v1_dag_to_task RENAME TO %I', new_table_name);
EXECUTE format('ALTER INDEX v1_dag_to_task_pkey RENAME TO %I', new_table_name || '_pkey');

EXECUTE
format('ALTER TABLE %s SET (
autovacuum_vacuum_scale_factor = ''0.1'',
autovacuum_analyze_scale_factor=''0.05'',
autovacuum_vacuum_threshold=''25'',
autovacuum_analyze_threshold=''25'',
autovacuum_vacuum_cost_delay=''10'',
autovacuum_vacuum_cost_limit=''1000''
)', new_table_name);

EXECUTE
format(
'ALTER TABLE v1_dag_to_task_partitioned ATTACH PARTITION %s FOR VALUES FROM (''19700101'') TO (''%s'')',
new_table_name,
TO_CHAR((NOW() + INTERVAL '1 day')::DATE, 'YYYYMMDD')
);
END $$;

ALTER TABLE v1_dag_to_task_partitioned RENAME TO v1_dag_to_task;
ALTER INDEX v1_dag_to_task_partitioned_pkey RENAME TO v1_dag_to_task_pkey;

SELECT create_v1_range_partition('v1_dag_to_task', (NOW() + INTERVAL '1 day')::DATE);
COMMIT;

BEGIN;
ANALYZE v1_dag_to_task;
COMMIT;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
CREATE TABLE v1_dag_to_task_original (
dag_id BIGINT NOT NULL,
dag_inserted_at TIMESTAMPTZ NOT NULL,
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (dag_id, dag_inserted_at, task_id, task_inserted_at)
);

INSERT INTO v1_dag_to_task_original
SELECT * FROM v1_dag_to_task;

DROP TABLE v1_dag_to_task;
ALTER TABLE v1_dag_to_task_original RENAME TO v1_dag_to_task;
ALTER INDEX v1_dag_to_task_original_pkey RENAME TO v1_dag_to_task_pkey;
-- +goose StatementEnd
18 changes: 18 additions & 0 deletions pkg/repository/v1/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2604,6 +2604,24 @@ func (r *OLAPRepositoryImpl) AnalyzeOLAPTables(ctx context.Context) error {
return fmt.Errorf("error analyzing v1_dags_olap: %v", err)
}

err = r.queries.AnalyzeV1EventsOLAP(ctx, tx)

if err != nil {
return fmt.Errorf("error analyzing v1_events_olap: %v", err)
}

err = r.queries.AnalyzeV1EventLookupTableOLAP(ctx, tx)

if err != nil {
return fmt.Errorf("error analyzing v1_event_lookup_table_olap: %v", err)
}

err = r.queries.AnalyzeV1EventToRunOLAP(ctx, tx)

if err != nil {
return fmt.Errorf("error analyzing v1_event_to_run_olap: %v", err)
}

err = r.queries.AnalyzeV1DAGToTaskOLAP(ctx, tx)

if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/repository/v1/sqlcv1/dags.sql
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,6 @@ INSERT INTO v1_dag_data (
$3,
$4
);

-- name: AnalyzeV1DAG :exec
ANALYZE v1_dag;
9 changes: 9 additions & 0 deletions pkg/repository/v1/sqlcv1/dags.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/repository/v1/sqlcv1/olap.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ ANALYZE v1_tasks_olap;
-- name: AnalyzeV1DAGsOLAP :exec
ANALYZE v1_dags_olap;

-- name: AnalyzeV1EventsOLAP :exec
ANALYZE v1_events_olap;

-- name: AnalyzeV1EventLookupTableOLAP :exec
ANALYZE v1_event_lookup_table_olap;

-- name: AnalyzeV1EventToRunOLAP :exec
ANALYZE v1_event_to_run_olap;

-- name: AnalyzeV1DAGToTaskOLAP :exec
ANALYZE v1_dag_to_task_olap;

Expand Down
27 changes: 27 additions & 0 deletions pkg/repository/v1/sqlcv1/olap.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 32 additions & 10 deletions pkg/repository/v1/sqlcv1/tasks.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
-- name: CreatePartitions :exec
SELECT
create_v1_range_partition('v1_task', @date::date),
create_v1_range_partition('v1_dag', @date::date),
create_v1_range_partition('v1_task_event', @date::date),
create_v1_range_partition('v1_log_line', @date::date),
create_v1_range_partition('v1_payload', @date::date);
-- intentionally formatted this way to limit merge conflicts
create_v1_range_partition('v1_task', @date::date)
, create_v1_range_partition('v1_dag', @date::date)
, create_v1_range_partition('v1_task_event', @date::date)
, create_v1_range_partition('v1_log_line', @date::date)
, create_v1_range_partition('v1_payload', @date::date)
, create_v1_range_partition('v1_dag_to_task', @date::date)
;

-- name: EnsureTablePartitionsExist :one
WITH tomorrow_date AS (
Expand Down Expand Up @@ -33,17 +36,26 @@ SELECT
FROM partition_check;

-- name: ListPartitionsBeforeDate :many
WITH task_partitions AS (
WITH
-- intentionally formatted this way to limit merge conflicts
task_partitions AS (
SELECT 'v1_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_task', @date::date) AS p
), dag_partitions AS (
)
, dag_partitions AS (
SELECT 'v1_dag' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag', @date::date) AS p
), task_event_partitions AS (
)
, task_event_partitions AS (
SELECT 'v1_task_event' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_task_event', @date::date) AS p
), log_line_partitions AS (
)
, log_line_partitions AS (
SELECT 'v1_log_line' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_log_line', @date::date) AS p
), payload_partitions AS (
)
, payload_partitions AS (
SELECT 'v1_payload' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_payload', @date::date) AS p
)
, dag_to_task_partitions AS (
SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', @date::date) AS p
)

SELECT
*
Expand Down Expand Up @@ -77,6 +89,13 @@ SELECT
*
FROM
payload_partitions

UNION ALL

SELECT
*
FROM
dag_to_task_partitions
;

-- name: DefaultTaskActivityGauge :one
Expand Down Expand Up @@ -919,6 +938,9 @@ ANALYZE v1_task;
-- name: AnalyzeV1TaskEvent :exec
ANALYZE v1_task_event;

-- name: AnalyzeV1DAGToTask :exec
ANALYZE v1_dag_to_task;

-- name: AnalyzeV1Dag :exec
ANALYZE v1_dag;

Expand Down
47 changes: 37 additions & 10 deletions pkg/repository/v1/sqlcv1/tasks.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pkg/repository/v1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3634,7 +3634,13 @@ func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error {
return fmt.Errorf("error analyzing v1_task_event: %v", err)
}

err = r.queries.AnalyzeV1Dag(ctx, tx)
err = r.queries.AnalyzeV1DAGToTask(ctx, tx)

if err != nil {
return fmt.Errorf("error analyzing v1_dag_to_task: %v", err)
}

err = r.queries.AnalyzeV1DAG(ctx, tx)

if err != nil {
return fmt.Errorf("error analyzing v1_dag: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion sql/schema/v1-core.sql
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ CREATE TABLE v1_dag_to_task (
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
CONSTRAINT v1_dag_to_task_pkey PRIMARY KEY (dag_id, dag_inserted_at, task_id, task_inserted_at)
);
) PARTITION BY RANGE(dag_inserted_at);

CREATE TABLE v1_dag_data (
dag_id BIGINT NOT NULL,
Expand Down
Loading