Skip to content

Commit 6c6de84

Browse files
committed
Add unique index to batch task run errors on the index field and more efficiently create errors in the completion callback
1 parent 4d76367 commit 6c6de84

File tree

3 files changed

+31
-19
lines changed

3 files changed

+31
-19
lines changed

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -746,35 +746,38 @@ export function setupBatchQueueCallbacks() {
746746
}
747747

748748
try {
749-
// Update BatchTaskRun
750-
await prisma.batchTaskRun.update({
751-
where: { id: batchId },
752-
data: {
753-
status,
754-
runIds,
755-
successfulRunCount,
756-
failedRunCount,
757-
completedAt: status === "ABORTED" ? new Date() : undefined,
758-
processingCompletedAt: new Date(),
759-
},
760-
});
749+
// Use a transaction to ensure atomicity of batch update and error record creation
750+
// skipDuplicates handles idempotency when callback is retried (relies on unique constraint)
751+
await prisma.$transaction(async (tx) => {
752+
// Update BatchTaskRun
753+
await tx.batchTaskRun.update({
754+
where: { id: batchId },
755+
data: {
756+
status,
757+
runIds,
758+
successfulRunCount,
759+
failedRunCount,
760+
completedAt: status === "ABORTED" ? new Date() : undefined,
761+
processingCompletedAt: new Date(),
762+
},
763+
});
761764

762-
// Create error records if there were failures
763-
if (failures.length > 0) {
764-
for (const failure of failures) {
765-
await prisma.batchTaskRunError.create({
766-
data: {
765+
// Create error records if there were failures
766+
if (failures.length > 0) {
767+
await tx.batchTaskRunError.createMany({
768+
data: failures.map((failure) => ({
767769
batchTaskRunId: batchId,
768770
index: failure.index,
769771
taskIdentifier: failure.taskIdentifier,
770772
payload: failure.payload,
771773
options: failure.options as Prisma.InputJsonValue | undefined,
772774
error: failure.error,
773775
errorCode: failure.errorCode,
774-
},
776+
})),
777+
skipDuplicates: true,
775778
});
776779
}
777-
}
780+
});
778781

779782
// Try to complete the batch (handles waitpoint completion if all runs are done)
780783
if (status !== "ABORTED") {
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/*
2+
Warnings:
3+
4+
- A unique constraint covering the columns `[batchTaskRunId,index]` on the table `BatchTaskRunError` will be added. If there are existing duplicate values, this will fail.
5+
6+
*/
7+
-- CreateIndex
8+
CREATE UNIQUE INDEX "BatchTaskRunError_batchTaskRunId_index_key" ON "public"."BatchTaskRunError"("batchTaskRunId", "index");

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,6 +1661,7 @@ model BatchTaskRunError {
16611661
16621662
createdAt DateTime @default(now())
16631663
1664+
@@unique([batchTaskRunId, index])
16641665
@@index([batchTaskRunId])
16651666
}
16661667

0 commit comments

Comments
 (0)