Skip to content

Commit 3855948

Browse files
authored
feat: replicate trigger_source, root_trigger_source, and is_warm_start to ClickHouse (#3274)
Adds three new top-level columns to the ClickHouse task_runs_v2 table primarily for analytics: - `trigger_source` / `root_trigger_source` - extracted from the existing TaskRun.annotations JSON during WAL replication - `is_warm_start` - new nullable boolean on TaskRun in Postgres, set in the existing taskRun.update() at attempt start (no additional write). null until the first attempt starts. Run region is already available via the existing `worker_queue` column in ClickHouse.
1 parent efcafdf commit 3855948

File tree

8 files changed

+72
-5
lines changed

8 files changed

+72
-5
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { Logger, type LogLevel } from "@trigger.dev/core/logger";
2222
import { tryCatch } from "@trigger.dev/core/utils";
2323
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
2424
import { unsafeExtractIdempotencyKeyScope, unsafeExtractIdempotencyKeyUser } from "@trigger.dev/core/v3/serverOnly";
25+
import { RunAnnotations } from "@trigger.dev/core/v3";
2526
import { type TaskRun } from "@trigger.dev/database";
2627
import { nanoid } from "nanoid";
2728
import EventEmitter from "node:events";
@@ -866,6 +867,8 @@ export class RunsReplicationService {
866867
? calculateErrorFingerprint(run.error)
867868
: '';
868869

870+
const annotations = this.#parseAnnotations(run.annotations);
871+
869872
// Return array matching TASK_RUN_COLUMNS order
870873
return [
871874
run.runtimeEnvironmentId, // environment_id
@@ -916,9 +919,16 @@ export class RunsReplicationService {
916919
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
917920
run.masterQueue ?? "", // worker_queue
918921
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
922+
annotations?.triggerSource ?? "", // trigger_source
923+
annotations?.rootTriggerSource ?? "", // root_trigger_source
924+
run.isWarmStart ?? null, // is_warm_start
919925
];
920926
}
921927

928+
#parseAnnotations(annotations: unknown) {
929+
return RunAnnotations.safeParse(annotations).data;
930+
}
931+
922932
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<PayloadInsertArray> {
923933
const payload = await this.#prepareJson(run.payload, run.payloadType);
924934

apps/webapp/test/runsReplicationService.part1.test.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ describe("RunsReplicationService (part 1/2)", () => {
8686
organizationId: organization.id,
8787
environmentType: "DEVELOPMENT",
8888
engine: "V2",
89+
annotations: {
90+
triggerSource: "api",
91+
triggerAction: "trigger",
92+
rootTriggerSource: "dashboard",
93+
},
94+
isWarmStart: true,
8995
},
9096
});
9197

@@ -111,6 +117,9 @@ describe("RunsReplicationService (part 1/2)", () => {
111117
organization_id: organization.id,
112118
environment_type: "DEVELOPMENT",
113119
engine: "V2",
120+
trigger_source: "api",
121+
root_trigger_source: "dashboard",
122+
is_warm_start: 1,
114123
})
115124
);
116125

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
-- +goose Up
2+
ALTER TABLE trigger_dev.task_runs_v2
3+
ADD COLUMN trigger_source LowCardinality(String) DEFAULT '';
4+
5+
ALTER TABLE trigger_dev.task_runs_v2
6+
ADD COLUMN root_trigger_source LowCardinality(String) DEFAULT '';
7+
8+
ALTER TABLE trigger_dev.task_runs_v2
9+
ADD COLUMN is_warm_start Nullable(UInt8) DEFAULT NULL;
10+
11+
-- +goose Down
12+
ALTER TABLE trigger_dev.task_runs_v2
13+
DROP COLUMN trigger_source;
14+
15+
ALTER TABLE trigger_dev.task_runs_v2
16+
DROP COLUMN root_trigger_source;
17+
18+
ALTER TABLE trigger_dev.task_runs_v2
19+
DROP COLUMN is_warm_start;

internal-packages/clickhouse/src/taskRuns.test.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ describe("Task Runs V2", () => {
8282
["bulk_action_group_id_1234", "bulk_action_group_id_1235"], // bulk_action_group_ids
8383
"", // worker_queue
8484
null, // max_duration_in_seconds
85+
"", // trigger_source
86+
"", // root_trigger_source
87+
null, // is_warm_start
8588
];
8689

8790
const [insertError, insertResult] = await insert([taskRunData]);
@@ -210,6 +213,9 @@ describe("Task Runs V2", () => {
210213
[], // bulk_action_group_ids
211214
"", // worker_queue
212215
null, // max_duration_in_seconds
216+
"", // trigger_source
217+
"", // root_trigger_source
218+
null, // is_warm_start
213219
];
214220

215221
const run2: TaskRunInsertArray = [
@@ -261,6 +267,9 @@ describe("Task Runs V2", () => {
261267
[], // bulk_action_group_ids
262268
"", // worker_queue
263269
null, // max_duration_in_seconds
270+
"", // trigger_source
271+
"", // root_trigger_source
272+
null, // is_warm_start
264273
];
265274

266275
const [insertError, insertResult] = await insert([run1, run2]);
@@ -359,6 +368,9 @@ describe("Task Runs V2", () => {
359368
[], // bulk_action_group_ids
360369
"", // worker_queue
361370
null, // max_duration_in_seconds
371+
"", // trigger_source
372+
"", // root_trigger_source
373+
null, // is_warm_start
362374
];
363375

364376
const [insertError, insertResult] = await insert([taskRun]);

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ export const TaskRunV2 = z.object({
4949
bulk_action_group_ids: z.array(z.string()).default([]),
5050
worker_queue: z.string().default(""),
5151
max_duration_in_seconds: z.number().int().nullish(),
52+
trigger_source: z.string().default(""),
53+
root_trigger_source: z.string().default(""),
54+
is_warm_start: z.boolean().nullish(),
5255
_version: z.string(),
5356
_is_deleted: z.number().int().default(0),
5457
});
@@ -105,6 +108,9 @@ export const TASK_RUN_COLUMNS = [
105108
"bulk_action_group_ids",
106109
"worker_queue",
107110
"max_duration_in_seconds",
111+
"trigger_source",
112+
"root_trigger_source",
113+
"is_warm_start",
108114
] as const;
109115

110116
export type TaskRunColumnName = (typeof TASK_RUN_COLUMNS)[number];
@@ -168,6 +174,9 @@ export type TaskRunFieldTypes = {
168174
bulk_action_group_ids: string[];
169175
worker_queue: string;
170176
max_duration_in_seconds: number | null;
177+
trigger_source: string;
178+
root_trigger_source: string;
179+
is_warm_start: boolean | null;
171180
};
172181

173182
/**
@@ -302,6 +311,9 @@ export type TaskRunInsertArray = [
302311
bulk_action_group_ids: string[],
303312
worker_queue: string,
304313
max_duration_in_seconds: number | null,
314+
trigger_source: string,
315+
root_trigger_source: string,
316+
is_warm_start: boolean | null,
305317
];
306318

307319
/**
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "public"."TaskRun" ADD COLUMN "isWarmStart" BOOLEAN;

internal-packages/database/prisma/schema.prisma

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -537,13 +537,13 @@ model BackgroundWorkerFile {
537537
}
538538

539539
model Prompt {
540-
id String @id @default(cuid())
541-
friendlyId String @unique @map("friendly_id")
540+
id String @id @default(cuid())
541+
friendlyId String @unique @map("friendly_id")
542542
slug String
543543
description String?
544544
type String @default("text") // "text" | "chat"
545545
546-
organization Organization @relation(fields: [organizationId], references: [id], onDelete: Cascade, onUpdate: Cascade)
546+
organization Organization @relation(fields: [organizationId], references: [id], onDelete: Cascade, onUpdate: Cascade)
547547
organizationId String
548548
549549
project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade)
@@ -558,7 +558,7 @@ model Prompt {
558558
defaultModel String?
559559
defaultConfig Json?
560560
561-
tags String[] @default([])
561+
tags String[] @default([])
562562
archivedAt DateTime?
563563
564564
createdAt DateTime @default(now())
@@ -840,6 +840,9 @@ model TaskRun {
840840
/// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId
841841
annotations Json?
842842
843+
/// Whether the latest attempt was a warm start. Null until first attempt starts.
844+
isWarmStart Boolean?
845+
843846
/// Run output
844847
output String?
845848
outputType String @default("application/json")
@@ -857,7 +860,6 @@ model TaskRun {
857860
/// Store the stream keys that are being used by the run
858861
realtimeStreams String[] @default([])
859862
860-
861863
@@unique([oneTimeUseToken])
862864
@@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey])
863865
// Finding child runs

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ export class RunAttemptSystem {
402402
status: "EXECUTING",
403403
attemptNumber: nextAttemptNumber,
404404
executedAt: taskRun.attemptNumber === null ? new Date() : undefined,
405+
isWarmStart: isWarmStart ?? false,
405406
},
406407
select: {
407408
id: true,

0 commit comments

Comments
 (0)