Skip to content

Commit 71d67e1

Browse files
committed
fix(fair-queue): prevent unbounded memory growth by cleaning up queue descriptor and cooloff state cache
1 parent 2eba36c commit 71d67e1

File tree

3 files changed

+221
-186
lines changed

3 files changed

+221
-186
lines changed

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 162 additions & 180 deletions
Original file line numberDiff line numberDiff line change
@@ -537,152 +537,101 @@ export class BatchQueue {
537537
}): Promise<void> {
538538
const { batchId, friendlyId, itemIndex, item } = ctx.message.payload;
539539

540-
return this.#startSpan(
541-
"BatchQueue.handleMessage",
542-
async (span) => {
543-
span?.setAttributes({
544-
"batch.id": batchId,
545-
"batch.friendlyId": friendlyId,
546-
"batch.itemIndex": itemIndex,
547-
"batch.task": item.task,
548-
"batch.consumerId": ctx.consumerId,
549-
"batch.attempt": ctx.message.attempt,
550-
});
551-
552-
// Record queue time metric (time from enqueue to processing)
553-
const queueTimeMs = Date.now() - ctx.message.timestamp;
554-
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: ctx.queue.tenantId });
555-
span?.setAttribute("batch.queueTimeMs", queueTimeMs);
556-
557-
this.logger.debug("Processing batch item", {
558-
batchId,
559-
friendlyId,
560-
itemIndex,
561-
task: item.task,
562-
consumerId: ctx.consumerId,
563-
attempt: ctx.message.attempt,
564-
queueTimeMs,
565-
});
566-
567-
if (!this.processItemCallback) {
568-
this.logger.error("No process item callback set", { batchId, itemIndex });
569-
// Still complete the message to avoid blocking
570-
await ctx.complete();
571-
return;
572-
}
540+
return this.#startSpan("BatchQueue.handleMessage", async (span) => {
541+
span?.setAttributes({
542+
"batch.id": batchId,
543+
"batch.friendlyId": friendlyId,
544+
"batch.itemIndex": itemIndex,
545+
"batch.task": item.task,
546+
"batch.consumerId": ctx.consumerId,
547+
"batch.attempt": ctx.message.attempt,
548+
});
573549

574-
// Get batch metadata
575-
const meta = await this.#startSpan("BatchQueue.getMeta", async () => {
576-
return this.completionTracker.getMeta(batchId);
577-
});
550+
// Record queue time metric (time from enqueue to processing)
551+
const queueTimeMs = Date.now() - ctx.message.timestamp;
552+
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: ctx.queue.tenantId });
553+
span?.setAttribute("batch.queueTimeMs", queueTimeMs);
578554

579-
if (!meta) {
580-
this.logger.error("Batch metadata not found", { batchId, itemIndex });
581-
await ctx.complete();
582-
return;
583-
}
555+
this.logger.debug("Processing batch item", {
556+
batchId,
557+
friendlyId,
558+
itemIndex,
559+
task: item.task,
560+
consumerId: ctx.consumerId,
561+
attempt: ctx.message.attempt,
562+
queueTimeMs,
563+
});
584564

585-
span?.setAttributes({
586-
"batch.runCount": meta.runCount,
587-
"batch.environmentId": meta.environmentId,
588-
});
565+
if (!this.processItemCallback) {
566+
this.logger.error("No process item callback set", { batchId, itemIndex });
567+
// Still complete the message to avoid blocking
568+
await ctx.complete();
569+
return;
570+
}
589571

590-
let processedCount: number;
572+
// Get batch metadata
573+
const meta = await this.#startSpan("BatchQueue.getMeta", async () => {
574+
return this.completionTracker.getMeta(batchId);
575+
});
591576

592-
try {
593-
const result = await this.#startSpan(
594-
"BatchQueue.processItemCallback",
595-
async (innerSpan) => {
596-
innerSpan?.setAttributes({
597-
"batch.id": batchId,
598-
"batch.itemIndex": itemIndex,
599-
"batch.task": item.task,
600-
});
601-
return this.processItemCallback!({
602-
batchId,
603-
friendlyId,
604-
itemIndex,
605-
item,
606-
meta,
607-
});
608-
}
609-
);
577+
if (!meta) {
578+
this.logger.error("Batch metadata not found", { batchId, itemIndex });
579+
await ctx.complete();
580+
return;
581+
}
610582

611-
if (result.success) {
612-
span?.setAttribute("batch.result", "success");
613-
span?.setAttribute("batch.runId", result.runId);
583+
span?.setAttributes({
584+
"batch.runCount": meta.runCount,
585+
"batch.environmentId": meta.environmentId,
586+
});
614587

615-
// Pass itemIndex for idempotency - prevents double-counting on redelivery
616-
processedCount = await this.#startSpan(
617-
"BatchQueue.recordSuccess",
618-
async () => {
619-
return this.completionTracker.recordSuccess(batchId, result.runId, itemIndex);
620-
}
621-
);
588+
let processedCount: number;
622589

623-
this.itemsProcessedCounter?.add(1, { envId: meta.environmentId });
624-
this.logger.debug("Batch item processed successfully", {
590+
try {
591+
const result = await this.#startSpan(
592+
"BatchQueue.processItemCallback",
593+
async (innerSpan) => {
594+
innerSpan?.setAttributes({
595+
"batch.id": batchId,
596+
"batch.itemIndex": itemIndex,
597+
"batch.task": item.task,
598+
});
599+
return this.processItemCallback!({
625600
batchId,
601+
friendlyId,
626602
itemIndex,
627-
runId: result.runId,
628-
processedCount,
629-
expectedCount: meta.runCount,
603+
item,
604+
meta,
630605
});
631-
} else {
632-
span?.setAttribute("batch.result", "failure");
633-
span?.setAttribute("batch.error", result.error);
634-
if (result.errorCode) {
635-
span?.setAttribute("batch.errorCode", result.errorCode);
636-
}
606+
}
607+
);
637608

638-
// For offloaded payloads (payloadType: "application/store"), payload is already an R2 path
639-
// For inline payloads, store the full payload - it's under the offload threshold anyway
640-
const payloadStr = await this.#startSpan(
641-
"BatchQueue.serializePayload",
642-
async (innerSpan) => {
643-
const str =
644-
typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
645-
innerSpan?.setAttribute("batch.payloadSize", str.length);
646-
return str;
647-
}
648-
);
649-
650-
processedCount = await this.#startSpan(
651-
"BatchQueue.recordFailure",
652-
async () => {
653-
return this.completionTracker.recordFailure(batchId, {
654-
index: itemIndex,
655-
taskIdentifier: item.task,
656-
payload: payloadStr,
657-
options: item.options,
658-
error: result.error,
659-
errorCode: result.errorCode,
660-
});
661-
}
662-
);
663-
664-
this.itemsFailedCounter?.add(1, {
665-
envId: meta.environmentId,
666-
errorCode: result.errorCode,
667-
});
609+
if (result.success) {
610+
span?.setAttribute("batch.result", "success");
611+
span?.setAttribute("batch.runId", result.runId);
668612

669-
this.logger.error("Batch item processing failed", {
670-
batchId,
671-
itemIndex,
672-
error: result.error,
673-
processedCount,
674-
expectedCount: meta.runCount,
675-
});
613+
// Pass itemIndex for idempotency - prevents double-counting on redelivery
614+
processedCount = await this.#startSpan("BatchQueue.recordSuccess", async () => {
615+
return this.completionTracker.recordSuccess(batchId, result.runId, itemIndex);
616+
});
617+
618+
this.itemsProcessedCounter?.add(1, { envId: meta.environmentId });
619+
this.logger.debug("Batch item processed successfully", {
620+
batchId,
621+
itemIndex,
622+
runId: result.runId,
623+
processedCount,
624+
expectedCount: meta.runCount,
625+
});
626+
} else {
627+
span?.setAttribute("batch.result", "failure");
628+
span?.setAttribute("batch.error", result.error);
629+
if (result.errorCode) {
630+
span?.setAttribute("batch.errorCode", result.errorCode);
676631
}
677-
} catch (error) {
678-
span?.setAttribute("batch.result", "unexpected_error");
679-
span?.setAttribute(
680-
"batch.error",
681-
error instanceof Error ? error.message : String(error)
682-
);
683632

684-
// Unexpected error during processing
685-
// For offloaded payloads, payload is an R2 path; for inline payloads, store full payload
633+
// For offloaded payloads (payloadType: "application/store"), payload is already an R2 path
634+
// For inline payloads, store the full payload - it's under the offload threshold anyway
686635
const payloadStr = await this.#startSpan(
687636
"BatchQueue.serializePayload",
688637
async (innerSpan) => {
@@ -693,56 +642,92 @@ export class BatchQueue {
693642
}
694643
);
695644

696-
processedCount = await this.#startSpan(
697-
"BatchQueue.recordFailure",
698-
async () => {
699-
return this.completionTracker.recordFailure(batchId, {
700-
index: itemIndex,
701-
taskIdentifier: item.task,
702-
payload: payloadStr,
703-
options: item.options,
704-
error: error instanceof Error ? error.message : String(error),
705-
errorCode: "UNEXPECTED_ERROR",
706-
});
707-
}
708-
);
645+
processedCount = await this.#startSpan("BatchQueue.recordFailure", async () => {
646+
return this.completionTracker.recordFailure(batchId, {
647+
index: itemIndex,
648+
taskIdentifier: item.task,
649+
payload: payloadStr,
650+
options: item.options,
651+
error: result.error,
652+
errorCode: result.errorCode,
653+
});
654+
});
709655

710656
this.itemsFailedCounter?.add(1, {
711657
envId: meta.environmentId,
712-
errorCode: "UNEXPECTED_ERROR",
658+
errorCode: result.errorCode,
713659
});
714-
this.logger.error("Unexpected error processing batch item", {
660+
661+
this.logger.error("Batch item processing failed", {
715662
batchId,
716663
itemIndex,
717-
error: error instanceof Error ? error.message : String(error),
664+
error: result.error,
718665
processedCount,
719666
expectedCount: meta.runCount,
720667
});
721668
}
669+
} catch (error) {
670+
span?.setAttribute("batch.result", "unexpected_error");
671+
span?.setAttribute("batch.error", error instanceof Error ? error.message : String(error));
672+
673+
// Unexpected error during processing
674+
// For offloaded payloads, payload is an R2 path; for inline payloads, store full payload
675+
const payloadStr = await this.#startSpan(
676+
"BatchQueue.serializePayload",
677+
async (innerSpan) => {
678+
const str =
679+
typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
680+
innerSpan?.setAttribute("batch.payloadSize", str.length);
681+
return str;
682+
}
683+
);
684+
685+
processedCount = await this.#startSpan("BatchQueue.recordFailure", async () => {
686+
return this.completionTracker.recordFailure(batchId, {
687+
index: itemIndex,
688+
taskIdentifier: item.task,
689+
payload: payloadStr,
690+
options: item.options,
691+
error: error instanceof Error ? error.message : String(error),
692+
errorCode: "UNEXPECTED_ERROR",
693+
});
694+
});
722695

723-
span?.setAttribute("batch.processedCount", processedCount);
724-
725-
// Complete the FairQueue message (no retry for batch items)
726-
// This must happen after recording success/failure to ensure the counter
727-
// is updated before the message is considered done
728-
await this.#startSpan("BatchQueue.completeMessage", async () => {
729-
return ctx.complete();
696+
this.itemsFailedCounter?.add(1, {
697+
envId: meta.environmentId,
698+
errorCode: "UNEXPECTED_ERROR",
730699
});
700+
this.logger.error("Unexpected error processing batch item", {
701+
batchId,
702+
itemIndex,
703+
error: error instanceof Error ? error.message : String(error),
704+
processedCount,
705+
expectedCount: meta.runCount,
706+
});
707+
}
731708

732-
// Check if all items have been processed using atomic counter
733-
// This is safe even with multiple concurrent consumers because
734-
// the processedCount is atomically incremented and we only trigger
735-
// finalization when we see the exact final count
736-
if (processedCount === meta.runCount) {
737-
this.logger.debug("All items processed, finalizing batch", {
738-
batchId,
739-
processedCount,
740-
expectedCount: meta.runCount,
741-
});
742-
await this.#finalizeBatch(batchId, meta);
743-
}
709+
span?.setAttribute("batch.processedCount", processedCount);
710+
711+
// Complete the FairQueue message (no retry for batch items)
712+
// This must happen after recording success/failure to ensure the counter
713+
// is updated before the message is considered done
714+
await this.#startSpan("BatchQueue.completeMessage", async () => {
715+
return ctx.complete();
716+
});
717+
718+
// Check if all items have been processed using atomic counter
719+
// This is safe even with multiple concurrent consumers because
720+
// the processedCount is atomically incremented and we only trigger
721+
// finalization when we see the exact final count
722+
if (processedCount === meta.runCount) {
723+
this.logger.debug("All items processed, finalizing batch", {
724+
batchId,
725+
processedCount,
726+
expectedCount: meta.runCount,
727+
});
728+
await this.#finalizeBatch(batchId, meta);
744729
}
745-
);
730+
});
746731
}
747732

748733
/**
@@ -757,19 +742,16 @@ export class BatchQueue {
757742
"batch.environmentId": meta.environmentId,
758743
});
759744

760-
const result = await this.#startSpan(
761-
"BatchQueue.getCompletionResult",
762-
async (innerSpan) => {
763-
const completionResult = await this.completionTracker.getCompletionResult(batchId);
764-
innerSpan?.setAttributes({
765-
"batch.successfulRunCount": completionResult.successfulRunCount,
766-
"batch.failedRunCount": completionResult.failedRunCount,
767-
"batch.runIdsCount": completionResult.runIds.length,
768-
"batch.failuresCount": completionResult.failures.length,
769-
});
770-
return completionResult;
771-
}
772-
);
745+
const result = await this.#startSpan("BatchQueue.getCompletionResult", async (innerSpan) => {
746+
const completionResult = await this.completionTracker.getCompletionResult(batchId);
747+
innerSpan?.setAttributes({
748+
"batch.successfulRunCount": completionResult.successfulRunCount,
749+
"batch.failedRunCount": completionResult.failedRunCount,
750+
"batch.runIdsCount": completionResult.runIds.length,
751+
"batch.failuresCount": completionResult.failures.length,
752+
});
753+
return completionResult;
754+
});
773755

774756
span?.setAttributes({
775757
"batch.successfulRunCount": result.successfulRunCount,

0 commit comments

Comments
 (0)