@@ -89,6 +89,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
8989 private cooloffEnabled : boolean ;
9090 private cooloffThreshold : number ;
9191 private cooloffPeriodMs : number ;
92+ private maxCooloffStatesSize : number ;
9293 private queueCooloffStates = new Map < string , QueueCooloffState > ( ) ;
9394
9495 // Global rate limiter
@@ -142,6 +143,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
142143 this . cooloffEnabled = options . cooloff ?. enabled ?? true ;
143144 this . cooloffThreshold = options . cooloff ?. threshold ?? 10 ;
144145 this . cooloffPeriodMs = options . cooloff ?. periodMs ?? 10_000 ;
146+ this . maxCooloffStatesSize = options . cooloff ?. maxStatesSize ?? 1000 ;
145147
146148 // Global rate limiter
147149 this . globalRateLimiter = options . globalRateLimiter ;
@@ -878,8 +880,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
878880 }
879881 this . #resetCooloff( queueId ) ;
880882 } else {
881- this . batchedSpanManager . incrementStat ( loopId , "claim_failures" ) ;
882- this . #incrementCooloff( queueId ) ;
883+ // Don't increment cooloff here - the queue was either:
884+ // 1. Empty (removed from master, cache cleaned up)
885+ // 2. Concurrency blocked (message released back to queue)
886+ // Neither case warrants cooloff as they're not failures
887+ this . batchedSpanManager . incrementStat ( loopId , "claim_skipped" ) ;
883888 }
884889 }
885890 }
@@ -1214,8 +1219,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12141219 this . #resetCooloff( queueId ) ;
12151220 slotsUsed ++ ;
12161221 } else {
1217- this . batchedSpanManager . incrementStat ( loopId , "process_failures" ) ;
1218- this . #incrementCooloff( queueId ) ;
1222+ // Don't increment cooloff here - the queue was either:
1223+ // 1. Empty (removed from master, cache cleaned up)
1224+ // 2. Concurrency blocked (message released back to queue)
1225+ // Neither case warrants cooloff as they're not failures
1226+ this . batchedSpanManager . incrementStat ( loopId , "process_skipped" ) ;
12191227 break ; // Queue empty or blocked, try next queue
12201228 }
12211229 }
@@ -1717,6 +1725,15 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
17171725 }
17181726
17191727 #incrementCooloff( queueId : string ) : void {
1728+ // Safety check: if the cache is too large, just clear it
1729+ if ( this . queueCooloffStates . size >= this . maxCooloffStatesSize ) {
1730+ this . logger . warn ( "Cooloff states cache hit size cap, clearing all entries" , {
1731+ size : this . queueCooloffStates . size ,
1732+ cap : this . maxCooloffStatesSize ,
1733+ } ) ;
1734+ this . queueCooloffStates . clear ( ) ;
1735+ }
1736+
17201737 const state = this . queueCooloffStates . get ( queueId ) ?? {
17211738 tag : "normal" as const ,
17221739 consecutiveFailures : 0 ,
0 commit comments