Skip to content

Commit 2e1c4f6

Browse files
authored
fix(clickhouse): partition by insertion date to prevent "Too many parts" errors when partitioning by start time (#2719)
1 parent 2bf86dc commit 2e1c4f6

File tree

10 files changed

+356
-22
lines changed

10 files changed

+356
-22
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1150,7 +1150,7 @@ const EnvironmentSchema = z
11501150
EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE: z.coerce.number().int().default(10485760),
11511151
EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS: z.coerce.number().int().default(5000),
11521152
EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(),
1153-
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse"]).default("postgres"),
1153+
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse", "clickhouse_v2"]).default("postgres"),
11541154
EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
11551155
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
11561156
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type {
44
TaskEventDetailsV1Result,
55
TaskEventSummaryV1Result,
66
TaskEventV1Input,
7+
TaskEventV2Input,
78
} from "@internal/clickhouse";
89
import { Attributes, startSpan, trace, Tracer } from "@internal/tracing";
910
import { createJsonErrorObject } from "@trigger.dev/core/v3/errors";
@@ -72,6 +73,12 @@ export type ClickhouseEventRepositoryConfig = {
7273
maximumTraceSummaryViewCount?: number;
7374
maximumTraceDetailedSummaryViewCount?: number;
7475
maximumLiveReloadingSetting?: number;
76+
/**
77+
* The version of the ClickHouse task_events table to use.
78+
* - "v1": Uses task_events_v1 (partitioned by start_time)
79+
* - "v2": Uses task_events_v2 (partitioned by inserted_at to avoid "too many parts" errors)
80+
*/
81+
version?: "v1" | "v2";
7582
};
7683

7784
/**
@@ -81,13 +88,15 @@ export type ClickhouseEventRepositoryConfig = {
8188
export class ClickhouseEventRepository implements IEventRepository {
8289
private _clickhouse: ClickHouse;
8390
private _config: ClickhouseEventRepositoryConfig;
84-
private readonly _flushScheduler: DynamicFlushScheduler<TaskEventV1Input>;
91+
private readonly _flushScheduler: DynamicFlushScheduler<TaskEventV1Input | TaskEventV2Input>;
8592
private _tracer: Tracer;
93+
private _version: "v1" | "v2";
8694

8795
constructor(config: ClickhouseEventRepositoryConfig) {
8896
this._clickhouse = config.clickhouse;
8997
this._config = config;
9098
this._tracer = config.tracer ?? trace.getTracer("clickhouseEventRepo", "0.0.1");
99+
this._version = config.version ?? "v1";
91100

92101
this._flushScheduler = new DynamicFlushScheduler({
93102
batchSize: config.batchSize ?? 1000,
@@ -99,31 +108,42 @@ export class ClickhouseEventRepository implements IEventRepository {
99108
memoryPressureThreshold: 10000,
100109
loadSheddingThreshold: 10000,
101110
loadSheddingEnabled: false,
102-
isDroppableEvent: (event: TaskEventV1Input) => {
111+
isDroppableEvent: (event: TaskEventV1Input | TaskEventV2Input) => {
103112
// Only drop LOG events during load shedding
104113
return event.kind === "DEBUG_EVENT";
105114
},
106115
});
107116
}
108117

118+
get version() {
119+
return this._version;
120+
}
121+
109122
get maximumLiveReloadingSetting() {
110123
return this._config.maximumLiveReloadingSetting ?? 1000;
111124
}
112125

113-
async #flushBatch(flushId: string, events: TaskEventV1Input[]) {
126+
async #flushBatch(flushId: string, events: (TaskEventV1Input | TaskEventV2Input)[]) {
114127
await startSpan(this._tracer, "flushBatch", async (span) => {
115128
span.setAttribute("flush_id", flushId);
116129
span.setAttribute("event_count", events.length);
130+
span.setAttribute("version", this._version);
117131

118132
const firstEvent = events[0];
119133

120134
if (firstEvent) {
121135
logger.debug("ClickhouseEventRepository.flushBatch first event", {
122136
event: firstEvent,
137+
version: this._version,
123138
});
124139
}
125140

126-
const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events, {
141+
const insertFn =
142+
this._version === "v2"
143+
? this._clickhouse.taskEventsV2.insert
144+
: this._clickhouse.taskEvents.insert;
145+
146+
const [insertError, insertResult] = await insertFn(events, {
127147
params: {
128148
clickhouse_settings: this.#getClickhouseInsertSettings(),
129149
},
@@ -136,6 +156,7 @@ export class ClickhouseEventRepository implements IEventRepository {
136156
logger.info("ClickhouseEventRepository.flushBatch Inserted batch into clickhouse", {
137157
events: events.length,
138158
insertResult,
159+
version: this._version,
139160
});
140161

141162
this.#publishToRedis(events);
@@ -155,7 +176,7 @@ export class ClickhouseEventRepository implements IEventRepository {
155176
}
156177
}
157178

158-
async #publishToRedis(events: TaskEventV1Input[]) {
179+
async #publishToRedis(events: (TaskEventV1Input | TaskEventV2Input)[]) {
159180
if (events.length === 0) return;
160181
await tracePubSub.publish(events.map((e) => e.trace_id));
161182
}
@@ -960,7 +981,10 @@ export class ClickhouseEventRepository implements IEventRepository {
960981
): Promise<TraceSummary | undefined> {
961982
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);
962983

963-
const queryBuilder = this._clickhouse.taskEvents.traceSummaryQueryBuilder();
984+
const queryBuilder =
985+
this._version === "v2"
986+
? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder()
987+
: this._clickhouse.taskEvents.traceSummaryQueryBuilder();
964988

965989
queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
966990
queryBuilder.where("trace_id = {traceId: String}", { traceId });
@@ -974,6 +998,14 @@ export class ClickhouseEventRepository implements IEventRepository {
974998
});
975999
}
9761000

1001+
// For v2, add inserted_at filtering for partition pruning
1002+
if (this._version === "v2") {
1003+
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
1004+
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
1005+
});
1006+
// No upper bound on inserted_at - we want all events inserted up to now
1007+
}
1008+
9771009
if (options?.includeDebugLogs === false) {
9781010
queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" });
9791011
}
@@ -1058,7 +1090,10 @@ export class ClickhouseEventRepository implements IEventRepository {
10581090
): Promise<SpanDetail | undefined> {
10591091
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);
10601092

1061-
const queryBuilder = this._clickhouse.taskEvents.spanDetailsQueryBuilder();
1093+
const queryBuilder =
1094+
this._version === "v2"
1095+
? this._clickhouse.taskEventsV2.spanDetailsQueryBuilder()
1096+
: this._clickhouse.taskEvents.spanDetailsQueryBuilder();
10621097

10631098
queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
10641099
queryBuilder.where("trace_id = {traceId: String}", { traceId });
@@ -1073,6 +1108,13 @@ export class ClickhouseEventRepository implements IEventRepository {
10731108
});
10741109
}
10751110

1111+
// For v2, add inserted_at filtering for partition pruning
1112+
if (this._version === "v2") {
1113+
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
1114+
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
1115+
});
1116+
}
1117+
10761118
queryBuilder.orderBy("start_time ASC");
10771119

10781120
const [queryError, records] = await queryBuilder.execute();
@@ -1477,7 +1519,10 @@ export class ClickhouseEventRepository implements IEventRepository {
14771519
): Promise<TraceDetailedSummary | undefined> {
14781520
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);
14791521

1480-
const queryBuilder = this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder();
1522+
const queryBuilder =
1523+
this._version === "v2"
1524+
? this._clickhouse.taskEventsV2.traceDetailedSummaryQueryBuilder()
1525+
: this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder();
14811526

14821527
queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
14831528
queryBuilder.where("trace_id = {traceId: String}", { traceId });
@@ -1491,6 +1536,13 @@ export class ClickhouseEventRepository implements IEventRepository {
14911536
});
14921537
}
14931538

1539+
// For v2, add inserted_at filtering for partition pruning
1540+
if (this._version === "v2") {
1541+
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
1542+
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
1543+
});
1544+
}
1545+
14941546
if (options?.includeDebugLogs === false) {
14951547
queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" });
14961548
}
@@ -1675,7 +1727,10 @@ export class ClickhouseEventRepository implements IEventRepository {
16751727
): Promise<RunPreparedEvent[]> {
16761728
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);
16771729

1678-
const queryBuilder = this._clickhouse.taskEvents.traceSummaryQueryBuilder();
1730+
const queryBuilder =
1731+
this._version === "v2"
1732+
? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder()
1733+
: this._clickhouse.taskEvents.traceSummaryQueryBuilder();
16791734

16801735
queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
16811736
queryBuilder.where("trace_id = {traceId: String}", { traceId });
@@ -1690,6 +1745,13 @@ export class ClickhouseEventRepository implements IEventRepository {
16901745
});
16911746
}
16921747

1748+
// For v2, add inserted_at filtering for partition pruning
1749+
if (this._version === "v2") {
1750+
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
1751+
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
1752+
});
1753+
}
1754+
16931755
queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" });
16941756
queryBuilder.orderBy("start_time ASC");
16951757

apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@ export const clickhouseEventRepository = singleton(
88
initializeClickhouseRepository
99
);
1010

11-
function initializeClickhouseRepository() {
11+
export const clickhouseEventRepositoryV2 = singleton(
12+
"clickhouseEventRepositoryV2",
13+
initializeClickhouseRepositoryV2
14+
);
15+
16+
function getClickhouseClient() {
1217
if (!env.EVENTS_CLICKHOUSE_URL) {
1318
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
1419
}
1520

1621
const url = new URL(env.EVENTS_CLICKHOUSE_URL);
1722
url.searchParams.delete("secure");
1823

19-
const safeUrl = new URL(url.toString());
20-
safeUrl.password = "redacted";
21-
22-
console.log("🗃️ Initializing Clickhouse event repository", { url: safeUrl.toString() });
23-
24-
const clickhouse = new ClickHouse({
24+
return new ClickHouse({
2525
url: url.toString(),
2626
name: "task-events",
2727
keepAlive: {
@@ -34,6 +34,55 @@ function initializeClickhouseRepository() {
3434
},
3535
maxOpenConnections: env.EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
3636
});
37+
}
38+
39+
function initializeClickhouseRepository() {
40+
if (!env.EVENTS_CLICKHOUSE_URL) {
41+
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
42+
}
43+
44+
const url = new URL(env.EVENTS_CLICKHOUSE_URL);
45+
url.searchParams.delete("secure");
46+
47+
const safeUrl = new URL(url.toString());
48+
safeUrl.password = "redacted";
49+
50+
console.log("🗃️ Initializing Clickhouse event repository (v1)", { url: safeUrl.toString() });
51+
52+
const clickhouse = getClickhouseClient();
53+
54+
const repository = new ClickhouseEventRepository({
55+
clickhouse: clickhouse,
56+
batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE,
57+
flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS,
58+
maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT,
59+
maximumTraceDetailedSummaryViewCount:
60+
env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT,
61+
maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING,
62+
insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY,
63+
waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1",
64+
asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE,
65+
asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS,
66+
version: "v1",
67+
});
68+
69+
return repository;
70+
}
71+
72+
function initializeClickhouseRepositoryV2() {
73+
if (!env.EVENTS_CLICKHOUSE_URL) {
74+
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
75+
}
76+
77+
const url = new URL(env.EVENTS_CLICKHOUSE_URL);
78+
url.searchParams.delete("secure");
79+
80+
const safeUrl = new URL(url.toString());
81+
safeUrl.password = "redacted";
82+
83+
console.log("🗃️ Initializing Clickhouse event repository (v2)", { url: safeUrl.toString() });
84+
85+
const clickhouse = getClickhouseClient();
3786

3887
const repository = new ClickhouseEventRepository({
3988
clickhouse: clickhouse,
@@ -47,6 +96,7 @@ function initializeClickhouseRepository() {
4796
waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1",
4897
asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE,
4998
asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS,
99+
version: "v2",
50100
});
51101

52102
return repository;

0 commit comments

Comments
 (0)