Skip to content

Commit 08f635d

Browse files
committed
fix(batch): optimize processing batch trigger v2
1 parent edf5b14 commit 08f635d

File tree

11 files changed

+313
-65
lines changed

11 files changed

+313
-65
lines changed

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,12 @@ const EnvironmentSchema = z
951951
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100),
952952
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3),
953953
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50),
954+
// Number of master queue shards for horizontal scaling
955+
BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1),
956+
// Maximum queues to fetch from master queue per iteration
957+
BATCH_QUEUE_MASTER_QUEUE_LIMIT: z.coerce.number().int().default(1000),
958+
// Worker queue blocking timeout in seconds (for two-stage processing)
959+
BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
954960
// Global rate limit: max items processed per second across all consumers
955961
// If not set, no global rate limiting is applied
956962
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,10 @@ function createRunEngine() {
171171
drr: {
172172
quantum: env.BATCH_QUEUE_DRR_QUANTUM,
173173
maxDeficit: env.BATCH_QUEUE_MAX_DEFICIT,
174+
masterQueueLimit: env.BATCH_QUEUE_MASTER_QUEUE_LIMIT,
174175
},
176+
shardCount: env.BATCH_QUEUE_SHARD_COUNT,
177+
workerQueueBlockingTimeoutSeconds: env.BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS,
175178
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
176179
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
177180
// Default processing concurrency when no specific limit is set

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ export class BatchQueue {
108108
keys: keyProducer,
109109
quantum: options.drr.quantum,
110110
maxDeficit: options.drr.maxDeficit,
111+
masterQueueLimit: options.drr.masterQueueLimit,
111112
logger: {
112113
debug: (msg, ctx) => this.logger.debug(msg, ctx),
113114
error: (msg, ctx) => this.logger.error(msg, ctx),
@@ -121,7 +122,7 @@ export class BatchQueue {
121122
scheduler,
122123
payloadSchema: BatchItemPayloadSchema,
123124
validateOnEnqueue: false, // We control the payload
124-
shardCount: 1, // Batches don't need sharding
125+
shardCount: options.shardCount ?? 1,
125126
consumerCount: options.consumerCount,
126127
consumerIntervalMs: options.consumerIntervalMs,
127128
visibilityTimeoutMs: 60_000, // 1 minute for batch item processing
@@ -131,6 +132,14 @@ export class BatchQueue {
131132
threshold: 5,
132133
periodMs: 5_000,
133134
},
135+
// Enable two-stage processing with worker queues for better parallelism (when configured)
136+
// Worker queues provide better concurrency by separating queue selection from message processing
137+
workerQueue: options.workerQueueBlockingTimeoutSeconds
138+
? {
139+
enabled: true,
140+
blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds,
141+
}
142+
: undefined,
134143
// Concurrency group based on tenant (environment)
135144
// This limits how many batch items can be processed concurrently per environment
136145
// Items wait in queue until capacity frees up

internal-packages/run-engine/src/batch-queue/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ export type DRRConfig = {
123123
quantum: number;
124124
/** Maximum accumulated deficit (prevents starvation) */
125125
maxDeficit: number;
126+
/** Maximum queues to fetch from master queue (default: 1000) */
127+
masterQueueLimit?: number;
126128
};
127129

128130
// ============================================================================
@@ -196,6 +198,10 @@ export type BatchQueueOptions = {
196198
consumerCount: number;
197199
/** Interval between consumer iterations (ms) */
198200
consumerIntervalMs: number;
201+
/** Number of master queue shards (default: 1) */
202+
shardCount?: number;
203+
/** Worker queue blocking timeout in seconds (enables two-stage processing) */
204+
workerQueueBlockingTimeoutSeconds?: number;
199205
/** Whether to start consumers on initialization */
200206
startConsumers?: boolean;
201207
/**

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,10 @@ export class RunEngine {
340340
drr: {
341341
quantum: options.batchQueue?.drr?.quantum ?? 5,
342342
maxDeficit: options.batchQueue?.drr?.maxDeficit ?? 50,
343+
masterQueueLimit: options.batchQueue?.drr?.masterQueueLimit,
343344
},
345+
shardCount: options.batchQueue?.shardCount,
346+
workerQueueBlockingTimeoutSeconds: options.batchQueue?.workerQueueBlockingTimeoutSeconds,
344347
consumerCount: options.batchQueue?.consumerCount ?? 2,
345348
consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100,
346349
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ export type RunEngineOptions = {
7676
batchQueue?: {
7777
redis: RedisOptions;
7878
drr?: Partial<DRRConfig>;
79+
/** Number of master queue shards (default: 1) */
80+
shardCount?: number;
81+
/** Worker queue blocking timeout in seconds (enables two-stage processing) */
82+
workerQueueBlockingTimeoutSeconds?: number;
7983
consumerCount?: number;
8084
consumerIntervalMs?: number;
8185
/** Default processing concurrency per environment when no specific limit is set */

packages/redis-worker/src/fair-queue/concurrency.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,26 @@ export class ConcurrencyManager {
114114
return await this.redis.scard(key);
115115
}
116116

117+
/**
118+
* Get available capacity for a queue across all concurrency groups.
119+
* Returns the minimum available capacity across all groups.
120+
*/
121+
async getAvailableCapacity(queue: QueueDescriptor): Promise<number> {
122+
let minCapacity = Infinity;
123+
124+
for (const group of this.groups) {
125+
const groupId = group.extractGroupId(queue);
126+
const key = this.keys.concurrencyKey(group.name, groupId);
127+
const current = await this.redis.scard(key);
128+
const limit = (await group.getLimit(groupId)) || group.defaultLimit;
129+
const available = Math.max(0, limit - current);
130+
131+
minCapacity = Math.min(minCapacity, available);
132+
}
133+
134+
return minCapacity === Infinity ? 0 : minCapacity;
135+
}
136+
117137
/**
118138
* Get concurrency limit for a specific group.
119139
*/

0 commit comments

Comments
 (0)