@@ -89,6 +89,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
8989 private cooloffEnabled : boolean ;
9090 private cooloffThreshold : number ;
9191 private cooloffPeriodMs : number ;
92+ private maxCooloffStatesSize : number ;
9293 private queueCooloffStates = new Map < string , QueueCooloffState > ( ) ;
9394
9495 // Global rate limiter
@@ -142,6 +143,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
142143 this . cooloffEnabled = options . cooloff ?. enabled ?? true ;
143144 this . cooloffThreshold = options . cooloff ?. threshold ?? 10 ;
144145 this . cooloffPeriodMs = options . cooloff ?. periodMs ?? 10_000 ;
146+ this . maxCooloffStatesSize = options . cooloff ?. maxStatesSize ?? 1000 ;
145147
146148 // Global rate limiter
147149 this . globalRateLimiter = options . globalRateLimiter ;
@@ -878,8 +880,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
878880 }
879881 this . #resetCooloff( queueId ) ;
880882 } else {
881- this . batchedSpanManager . incrementStat ( loopId , "claim_failures" ) ;
882- this . #incrementCooloff( queueId ) ;
883+ // Don't increment cooloff here - the queue was either:
884+ // 1. Empty (removed from master, cache cleaned up)
885+ // 2. Concurrency blocked (message released back to queue)
886+ // Neither case warrants cooloff as they're not failures
887+ this . batchedSpanManager . incrementStat ( loopId , "claim_skipped" ) ;
883888 }
884889 }
885890 }
@@ -904,6 +909,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
904909 if ( this . concurrencyManager ) {
905910 const check = await this . concurrencyManager . canProcess ( descriptor ) ;
906911 if ( ! check . allowed ) {
912+ // Queue at max concurrency, back off to avoid repeated attempts
913+ this . #incrementCooloff( queueId ) ;
907914 return false ;
908915 }
909916 }
@@ -953,6 +960,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
953960 queueItemsKey ,
954961 masterQueueKey
955962 ) ;
963+ // Concurrency reservation failed, back off to avoid repeated attempts
964+ this . #incrementCooloff( queueId ) ;
956965 return false ;
957966 }
958967 }
@@ -1214,8 +1223,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12141223 this . #resetCooloff( queueId ) ;
12151224 slotsUsed ++ ;
12161225 } else {
1217- this . batchedSpanManager . incrementStat ( loopId , "process_failures" ) ;
1218- this . #incrementCooloff( queueId ) ;
1226+ // Don't increment cooloff here - the queue was either:
1227+ // 1. Empty (removed from master, cache cleaned up)
1228+ // 2. Concurrency blocked (message released back to queue)
1229+ // Neither case warrants cooloff as they're not failures
1230+ this . batchedSpanManager . incrementStat ( loopId , "process_skipped" ) ;
12191231 break ; // Queue empty or blocked, try next queue
12201232 }
12211233 }
@@ -1245,6 +1257,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12451257 if ( this . concurrencyManager ) {
12461258 const check = await this . concurrencyManager . canProcess ( descriptor ) ;
12471259 if ( ! check . allowed ) {
1260+ // Queue at max concurrency, back off to avoid repeated attempts
1261+ this . #incrementCooloff( queueId ) ;
12481262 return false ;
12491263 }
12501264 }
@@ -1294,6 +1308,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12941308 queueItemsKey ,
12951309 masterQueueKey
12961310 ) ;
1311+ // Concurrency reservation failed, back off to avoid repeated attempts
1312+ this . #incrementCooloff( queueId ) ;
12971313 return false ;
12981314 }
12991315 }
@@ -1717,6 +1733,15 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
17171733 }
17181734
17191735 #incrementCooloff( queueId : string ) : void {
1736+ // Safety check: if the cache is too large, just clear it
1737+ if ( this . queueCooloffStates . size >= this . maxCooloffStatesSize ) {
1738+ this . logger . warn ( "Cooloff states cache hit size cap, clearing all entries" , {
1739+ size : this . queueCooloffStates . size ,
1740+ cap : this . maxCooloffStatesSize ,
1741+ } ) ;
1742+ this . queueCooloffStates . clear ( ) ;
1743+ }
1744+
17201745 const state = this . queueCooloffStates . get ( queueId ) ?? {
17211746 tag : "normal" as const ,
17221747 consecutiveFailures : 0 ,
0 commit comments