Skip to content

Commit d69cfa5

Browse files
committed
batch task run seal update is atomic
1 parent 6c6de84 commit d69cfa5

File tree

2 files changed

+644
-8
lines changed

2 files changed

+644
-8
lines changed

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
BatchItemNDJSON as BatchItemNDJSONSchema,
55
} from "@trigger.dev/core/v3";
66
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
7-
import type { BatchItem } from "@internal/run-engine";
7+
import type { BatchItem, RunEngine } from "@internal/run-engine";
88
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
99
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1010
import { logger } from "~/services/logger.server";
@@ -15,6 +15,11 @@ export type StreamBatchItemsServiceOptions = {
1515
maxItemBytes: number;
1616
};
1717

18+
export type StreamBatchItemsServiceConstructorOptions = {
19+
prisma?: PrismaClientOrTransaction;
20+
engine?: RunEngine;
21+
};
22+
1823
/**
1924
* Stream Batch Items Service (Phase 2 of 2-phase batch API).
2025
*
@@ -31,8 +36,8 @@ export type StreamBatchItemsServiceOptions = {
3136
export class StreamBatchItemsService extends WithRunEngine {
3237
private readonly payloadProcessor: BatchPayloadProcessor;
3338

34-
constructor(protected readonly _prisma: PrismaClientOrTransaction = prisma) {
35-
super({ prisma });
39+
constructor(opts: StreamBatchItemsServiceConstructorOptions = {}) {
40+
super({ prisma: opts.prisma ?? prisma, engine: opts.engine });
3641
this.payloadProcessor = new BatchPayloadProcessor();
3742
}
3843

@@ -172,17 +177,74 @@ export class StreamBatchItemsService extends WithRunEngine {
172177
};
173178
}
174179

175-
// Seal the batch - update status to PROCESSING
176-
await this._prisma.batchTaskRun.update({
177-
where: { id: batchId },
180+
// Seal the batch - use conditional update to prevent TOCTOU race
181+
// Another concurrent request may have already sealed this batch
182+
const now = new Date();
183+
const sealResult = await this._prisma.batchTaskRun.updateMany({
184+
where: {
185+
id: batchId,
186+
sealed: false,
187+
status: "PENDING",
188+
},
178189
data: {
179190
sealed: true,
180-
sealedAt: new Date(),
191+
sealedAt: now,
181192
status: "PROCESSING",
182-
processingStartedAt: new Date(),
193+
processingStartedAt: now,
183194
},
184195
});
185196

197+
// Check if we won the race to seal the batch
198+
if (sealResult.count === 0) {
199+
// Another request sealed the batch first - re-query to check current state
200+
const currentBatch = await this._prisma.batchTaskRun.findUnique({
201+
where: { id: batchId },
202+
select: {
203+
id: true,
204+
friendlyId: true,
205+
status: true,
206+
sealed: true,
207+
},
208+
});
209+
210+
if (currentBatch?.sealed && currentBatch.status === "PROCESSING") {
211+
// The batch was sealed by another request - this is fine, the goal was achieved
212+
logger.info("Batch already sealed by concurrent request", {
213+
batchId: batchFriendlyId,
214+
itemsAccepted,
215+
itemsDeduplicated,
216+
envId: environment.id,
217+
});
218+
219+
span.setAttribute("itemsAccepted", itemsAccepted);
220+
span.setAttribute("itemsDeduplicated", itemsDeduplicated);
221+
span.setAttribute("sealedByConcurrentRequest", true);
222+
223+
return {
224+
id: batchFriendlyId,
225+
itemsAccepted,
226+
itemsDeduplicated,
227+
sealed: true,
228+
};
229+
}
230+
231+
// Batch is in an unexpected state - fail with error
232+
const actualStatus = currentBatch?.status ?? "unknown";
233+
const actualSealed = currentBatch?.sealed ?? "unknown";
234+
logger.error("Batch seal race condition: unexpected state", {
235+
batchId: batchFriendlyId,
236+
expectedStatus: "PENDING",
237+
actualStatus,
238+
expectedSealed: false,
239+
actualSealed,
240+
envId: environment.id,
241+
});
242+
243+
throw new ServiceValidationError(
244+
`Batch ${batchFriendlyId} is in unexpected state (status: ${actualStatus}, sealed: ${actualSealed}). Cannot seal batch.`
245+
);
246+
}
247+
186248
logger.info("Batch sealed and ready for processing", {
187249
batchId: batchFriendlyId,
188250
itemsAccepted,

0 commit comments

Comments
 (0)