@@ -537,152 +537,101 @@ export class BatchQueue {
537537 } ) : Promise < void > {
538538 const { batchId, friendlyId, itemIndex, item } = ctx . message . payload ;
539539
540- return this . #startSpan(
541- "BatchQueue.handleMessage" ,
542- async ( span ) => {
543- span ?. setAttributes ( {
544- "batch.id" : batchId ,
545- "batch.friendlyId" : friendlyId ,
546- "batch.itemIndex" : itemIndex ,
547- "batch.task" : item . task ,
548- "batch.consumerId" : ctx . consumerId ,
549- "batch.attempt" : ctx . message . attempt ,
550- } ) ;
551-
552- // Record queue time metric (time from enqueue to processing)
553- const queueTimeMs = Date . now ( ) - ctx . message . timestamp ;
554- this . itemQueueTimeHistogram ?. record ( queueTimeMs , { envId : ctx . queue . tenantId } ) ;
555- span ?. setAttribute ( "batch.queueTimeMs" , queueTimeMs ) ;
556-
557- this . logger . debug ( "Processing batch item" , {
558- batchId,
559- friendlyId,
560- itemIndex,
561- task : item . task ,
562- consumerId : ctx . consumerId ,
563- attempt : ctx . message . attempt ,
564- queueTimeMs,
565- } ) ;
566-
567- if ( ! this . processItemCallback ) {
568- this . logger . error ( "No process item callback set" , { batchId, itemIndex } ) ;
569- // Still complete the message to avoid blocking
570- await ctx . complete ( ) ;
571- return ;
572- }
540+ return this . #startSpan( "BatchQueue.handleMessage" , async ( span ) => {
541+ span ?. setAttributes ( {
542+ "batch.id" : batchId ,
543+ "batch.friendlyId" : friendlyId ,
544+ "batch.itemIndex" : itemIndex ,
545+ "batch.task" : item . task ,
546+ "batch.consumerId" : ctx . consumerId ,
547+ "batch.attempt" : ctx . message . attempt ,
548+ } ) ;
573549
574- // Get batch metadata
575- const meta = await this . #startSpan ( "BatchQueue.getMeta" , async ( ) => {
576- return this . completionTracker . getMeta ( batchId ) ;
577- } ) ;
550+ // Record queue time metric (time from enqueue to processing)
551+ const queueTimeMs = Date . now ( ) - ctx . message . timestamp ;
552+ this . itemQueueTimeHistogram ?. record ( queueTimeMs , { envId : ctx . queue . tenantId } ) ;
553+ span ?. setAttribute ( "batch.queueTimeMs" , queueTimeMs ) ;
578554
579- if ( ! meta ) {
580- this . logger . error ( "Batch metadata not found" , { batchId, itemIndex } ) ;
581- await ctx . complete ( ) ;
582- return ;
583- }
555+ this . logger . debug ( "Processing batch item" , {
556+ batchId,
557+ friendlyId,
558+ itemIndex,
559+ task : item . task ,
560+ consumerId : ctx . consumerId ,
561+ attempt : ctx . message . attempt ,
562+ queueTimeMs,
563+ } ) ;
584564
585- span ?. setAttributes ( {
586- "batch.runCount" : meta . runCount ,
587- "batch.environmentId" : meta . environmentId ,
588- } ) ;
565+ if ( ! this . processItemCallback ) {
566+ this . logger . error ( "No process item callback set" , { batchId, itemIndex } ) ;
567+ // Still complete the message to avoid blocking
568+ await ctx . complete ( ) ;
569+ return ;
570+ }
589571
590- let processedCount : number ;
572+ // Get batch metadata
573+ const meta = await this . #startSpan( "BatchQueue.getMeta" , async ( ) => {
574+ return this . completionTracker . getMeta ( batchId ) ;
575+ } ) ;
591576
592- try {
593- const result = await this . #startSpan(
594- "BatchQueue.processItemCallback" ,
595- async ( innerSpan ) => {
596- innerSpan ?. setAttributes ( {
597- "batch.id" : batchId ,
598- "batch.itemIndex" : itemIndex ,
599- "batch.task" : item . task ,
600- } ) ;
601- return this . processItemCallback ! ( {
602- batchId,
603- friendlyId,
604- itemIndex,
605- item,
606- meta,
607- } ) ;
608- }
609- ) ;
577+ if ( ! meta ) {
578+ this . logger . error ( "Batch metadata not found" , { batchId, itemIndex } ) ;
579+ await ctx . complete ( ) ;
580+ return ;
581+ }
610582
611- if ( result . success ) {
612- span ?. setAttribute ( "batch.result" , "success" ) ;
613- span ?. setAttribute ( "batch.runId" , result . runId ) ;
583+ span ?. setAttributes ( {
584+ "batch.runCount" : meta . runCount ,
585+ "batch.environmentId" : meta . environmentId ,
586+ } ) ;
614587
615- // Pass itemIndex for idempotency - prevents double-counting on redelivery
616- processedCount = await this . #startSpan(
617- "BatchQueue.recordSuccess" ,
618- async ( ) => {
619- return this . completionTracker . recordSuccess ( batchId , result . runId , itemIndex ) ;
620- }
621- ) ;
588+ let processedCount : number ;
622589
623- this . itemsProcessedCounter ?. add ( 1 , { envId : meta . environmentId } ) ;
624- this . logger . debug ( "Batch item processed successfully" , {
590+ try {
591+ const result = await this . #startSpan(
592+ "BatchQueue.processItemCallback" ,
593+ async ( innerSpan ) => {
594+ innerSpan ?. setAttributes ( {
595+ "batch.id" : batchId ,
596+ "batch.itemIndex" : itemIndex ,
597+ "batch.task" : item . task ,
598+ } ) ;
599+ return this . processItemCallback ! ( {
625600 batchId,
601+ friendlyId,
626602 itemIndex,
627- runId : result . runId ,
628- processedCount,
629- expectedCount : meta . runCount ,
603+ item,
604+ meta,
630605 } ) ;
631- } else {
632- span ?. setAttribute ( "batch.result" , "failure" ) ;
633- span ?. setAttribute ( "batch.error" , result . error ) ;
634- if ( result . errorCode ) {
635- span ?. setAttribute ( "batch.errorCode" , result . errorCode ) ;
636- }
606+ }
607+ ) ;
637608
638- // For offloaded payloads (payloadType: "application/store"), payload is already an R2 path
639- // For inline payloads, store the full payload - it's under the offload threshold anyway
640- const payloadStr = await this . #startSpan(
641- "BatchQueue.serializePayload" ,
642- async ( innerSpan ) => {
643- const str =
644- typeof item . payload === "string" ? item . payload : JSON . stringify ( item . payload ) ;
645- innerSpan ?. setAttribute ( "batch.payloadSize" , str . length ) ;
646- return str ;
647- }
648- ) ;
649-
650- processedCount = await this . #startSpan(
651- "BatchQueue.recordFailure" ,
652- async ( ) => {
653- return this . completionTracker . recordFailure ( batchId , {
654- index : itemIndex ,
655- taskIdentifier : item . task ,
656- payload : payloadStr ,
657- options : item . options ,
658- error : result . error ,
659- errorCode : result . errorCode ,
660- } ) ;
661- }
662- ) ;
663-
664- this . itemsFailedCounter ?. add ( 1 , {
665- envId : meta . environmentId ,
666- errorCode : result . errorCode ,
667- } ) ;
609+ if ( result . success ) {
610+ span ?. setAttribute ( "batch.result" , "success" ) ;
611+ span ?. setAttribute ( "batch.runId" , result . runId ) ;
668612
669- this . logger . error ( "Batch item processing failed" , {
670- batchId,
671- itemIndex,
672- error : result . error ,
673- processedCount,
674- expectedCount : meta . runCount ,
675- } ) ;
613+ // Pass itemIndex for idempotency - prevents double-counting on redelivery
614+ processedCount = await this . #startSpan( "BatchQueue.recordSuccess" , async ( ) => {
615+ return this . completionTracker . recordSuccess ( batchId , result . runId , itemIndex ) ;
616+ } ) ;
617+
618+ this . itemsProcessedCounter ?. add ( 1 , { envId : meta . environmentId } ) ;
619+ this . logger . debug ( "Batch item processed successfully" , {
620+ batchId,
621+ itemIndex,
622+ runId : result . runId ,
623+ processedCount,
624+ expectedCount : meta . runCount ,
625+ } ) ;
626+ } else {
627+ span ?. setAttribute ( "batch.result" , "failure" ) ;
628+ span ?. setAttribute ( "batch.error" , result . error ) ;
629+ if ( result . errorCode ) {
630+ span ?. setAttribute ( "batch.errorCode" , result . errorCode ) ;
676631 }
677- } catch ( error ) {
678- span ?. setAttribute ( "batch.result" , "unexpected_error" ) ;
679- span ?. setAttribute (
680- "batch.error" ,
681- error instanceof Error ? error . message : String ( error )
682- ) ;
683632
684- // Unexpected error during processing
685- // For offloaded payloads, payload is an R2 path; for inline payloads, store full payload
633+ // For offloaded payloads (payloadType: "application/store"), payload is already an R2 path
634+ // For inline payloads, store the full payload - it's under the offload threshold anyway
686635 const payloadStr = await this . #startSpan(
687636 "BatchQueue.serializePayload" ,
688637 async ( innerSpan ) => {
@@ -693,56 +642,92 @@ export class BatchQueue {
693642 }
694643 ) ;
695644
696- processedCount = await this . #startSpan(
697- "BatchQueue.recordFailure" ,
698- async ( ) => {
699- return this . completionTracker . recordFailure ( batchId , {
700- index : itemIndex ,
701- taskIdentifier : item . task ,
702- payload : payloadStr ,
703- options : item . options ,
704- error : error instanceof Error ? error . message : String ( error ) ,
705- errorCode : "UNEXPECTED_ERROR" ,
706- } ) ;
707- }
708- ) ;
645+ processedCount = await this . #startSpan( "BatchQueue.recordFailure" , async ( ) => {
646+ return this . completionTracker . recordFailure ( batchId , {
647+ index : itemIndex ,
648+ taskIdentifier : item . task ,
649+ payload : payloadStr ,
650+ options : item . options ,
651+ error : result . error ,
652+ errorCode : result . errorCode ,
653+ } ) ;
654+ } ) ;
709655
710656 this . itemsFailedCounter ?. add ( 1 , {
711657 envId : meta . environmentId ,
712- errorCode : "UNEXPECTED_ERROR" ,
658+ errorCode : result . errorCode ,
713659 } ) ;
714- this . logger . error ( "Unexpected error processing batch item" , {
660+
661+ this . logger . error ( "Batch item processing failed" , {
715662 batchId,
716663 itemIndex,
717- error : error instanceof Error ? error . message : String ( error ) ,
664+ error : result . error ,
718665 processedCount,
719666 expectedCount : meta . runCount ,
720667 } ) ;
721668 }
669+ } catch ( error ) {
670+ span ?. setAttribute ( "batch.result" , "unexpected_error" ) ;
671+ span ?. setAttribute ( "batch.error" , error instanceof Error ? error . message : String ( error ) ) ;
672+
673+ // Unexpected error during processing
674+ // For offloaded payloads, payload is an R2 path; for inline payloads, store full payload
675+ const payloadStr = await this . #startSpan(
676+ "BatchQueue.serializePayload" ,
677+ async ( innerSpan ) => {
678+ const str =
679+ typeof item . payload === "string" ? item . payload : JSON . stringify ( item . payload ) ;
680+ innerSpan ?. setAttribute ( "batch.payloadSize" , str . length ) ;
681+ return str ;
682+ }
683+ ) ;
684+
685+ processedCount = await this . #startSpan( "BatchQueue.recordFailure" , async ( ) => {
686+ return this . completionTracker . recordFailure ( batchId , {
687+ index : itemIndex ,
688+ taskIdentifier : item . task ,
689+ payload : payloadStr ,
690+ options : item . options ,
691+ error : error instanceof Error ? error . message : String ( error ) ,
692+ errorCode : "UNEXPECTED_ERROR" ,
693+ } ) ;
694+ } ) ;
722695
723- span ?. setAttribute ( "batch.processedCount" , processedCount ) ;
724-
725- // Complete the FairQueue message (no retry for batch items)
726- // This must happen after recording success/failure to ensure the counter
727- // is updated before the message is considered done
728- await this . #startSpan( "BatchQueue.completeMessage" , async ( ) => {
729- return ctx . complete ( ) ;
696+ this . itemsFailedCounter ?. add ( 1 , {
697+ envId : meta . environmentId ,
698+ errorCode : "UNEXPECTED_ERROR" ,
730699 } ) ;
700+ this . logger . error ( "Unexpected error processing batch item" , {
701+ batchId,
702+ itemIndex,
703+ error : error instanceof Error ? error . message : String ( error ) ,
704+ processedCount,
705+ expectedCount : meta . runCount ,
706+ } ) ;
707+ }
731708
732- // Check if all items have been processed using atomic counter
733- // This is safe even with multiple concurrent consumers because
734- // the processedCount is atomically incremented and we only trigger
735- // finalization when we see the exact final count
736- if ( processedCount === meta . runCount ) {
737- this . logger . debug ( "All items processed, finalizing batch" , {
738- batchId,
739- processedCount,
740- expectedCount : meta . runCount ,
741- } ) ;
742- await this . #finalizeBatch( batchId , meta ) ;
743- }
709+ span ?. setAttribute ( "batch.processedCount" , processedCount ) ;
710+
711+ // Complete the FairQueue message (no retry for batch items)
712+ // This must happen after recording success/failure to ensure the counter
713+ // is updated before the message is considered done
714+ await this . #startSpan( "BatchQueue.completeMessage" , async ( ) => {
715+ return ctx . complete ( ) ;
716+ } ) ;
717+
718+ // Check if all items have been processed using atomic counter
719+ // This is safe even with multiple concurrent consumers because
720+ // the processedCount is atomically incremented and we only trigger
721+ // finalization when we see the exact final count
722+ if ( processedCount === meta . runCount ) {
723+ this . logger . debug ( "All items processed, finalizing batch" , {
724+ batchId,
725+ processedCount,
726+ expectedCount : meta . runCount ,
727+ } ) ;
728+ await this . #finalizeBatch( batchId , meta ) ;
744729 }
745- ) ;
730+ } ) ;
746731 }
747732
748733 /**
@@ -757,19 +742,16 @@ export class BatchQueue {
757742 "batch.environmentId" : meta . environmentId ,
758743 } ) ;
759744
760- const result = await this . #startSpan(
761- "BatchQueue.getCompletionResult" ,
762- async ( innerSpan ) => {
763- const completionResult = await this . completionTracker . getCompletionResult ( batchId ) ;
764- innerSpan ?. setAttributes ( {
765- "batch.successfulRunCount" : completionResult . successfulRunCount ,
766- "batch.failedRunCount" : completionResult . failedRunCount ,
767- "batch.runIdsCount" : completionResult . runIds . length ,
768- "batch.failuresCount" : completionResult . failures . length ,
769- } ) ;
770- return completionResult ;
771- }
772- ) ;
745+ const result = await this . #startSpan( "BatchQueue.getCompletionResult" , async ( innerSpan ) => {
746+ const completionResult = await this . completionTracker . getCompletionResult ( batchId ) ;
747+ innerSpan ?. setAttributes ( {
748+ "batch.successfulRunCount" : completionResult . successfulRunCount ,
749+ "batch.failedRunCount" : completionResult . failedRunCount ,
750+ "batch.runIdsCount" : completionResult . runIds . length ,
751+ "batch.failuresCount" : completionResult . failures . length ,
752+ } ) ;
753+ return completionResult ;
754+ } ) ;
773755
774756 span ?. setAttributes ( {
775757 "batch.successfulRunCount" : result . successfulRunCount ,
0 commit comments