Skip to content

Commit f592aac

Browse files
committed
Fix fair queue heartbeat
1 parent 5211ba6 commit f592aac

File tree

2 files changed

+280
-4
lines changed

2 files changed

+280
-4
lines changed
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
import { describe, expect } from "vitest";
2+
import { redisTest } from "@internal/testcontainers";
3+
import { createRedisClient } from "@internal/redis";
4+
import { VisibilityManager, DefaultFairQueueKeyProducer } from "../index.js";
5+
import type { FairQueueKeyProducer } from "../types.js";
6+
7+
describe("VisibilityManager", () => {
8+
let keys: FairQueueKeyProducer;
9+
10+
describe("heartbeat", () => {
11+
redisTest(
12+
"should return true when message exists in in-flight set",
13+
{ timeout: 10000 },
14+
async ({ redisOptions }) => {
15+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
16+
17+
const manager = new VisibilityManager({
18+
redis: redisOptions,
19+
keys,
20+
shardCount: 1,
21+
defaultTimeoutMs: 5000,
22+
});
23+
24+
const redis = createRedisClient(redisOptions);
25+
const queueId = "tenant:t1:queue:heartbeat-exists";
26+
const queueKey = keys.queueKey(queueId);
27+
const queueItemsKey = keys.queueItemsKey(queueId);
28+
29+
// Add a message to the queue
30+
const messageId = "heartbeat-test-msg";
31+
const storedMessage = {
32+
id: messageId,
33+
queueId,
34+
tenantId: "t1",
35+
payload: { id: 1, value: "test" },
36+
timestamp: Date.now() - 1000,
37+
attempt: 1,
38+
};
39+
40+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
41+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
42+
43+
// Claim the message (moves it to in-flight set)
44+
const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 5000);
45+
expect(claimResult.claimed).toBe(true);
46+
47+
// Heartbeat should succeed since message is in-flight
48+
const heartbeatResult = await manager.heartbeat(messageId, queueId, 5000);
49+
expect(heartbeatResult).toBe(true);
50+
51+
await manager.close();
52+
await redis.quit();
53+
}
54+
);
55+
56+
redisTest(
57+
"should return false when message does not exist in in-flight set",
58+
{ timeout: 10000 },
59+
async ({ redisOptions }) => {
60+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
61+
62+
const manager = new VisibilityManager({
63+
redis: redisOptions,
64+
keys,
65+
shardCount: 1,
66+
defaultTimeoutMs: 5000,
67+
});
68+
69+
// Heartbeat for a message that was never claimed
70+
const heartbeatResult = await manager.heartbeat(
71+
"non-existent-msg",
72+
"tenant:t1:queue:non-existent",
73+
5000
74+
);
75+
expect(heartbeatResult).toBe(false);
76+
77+
await manager.close();
78+
}
79+
);
80+
81+
redisTest(
82+
"should return false after message is completed",
83+
{ timeout: 10000 },
84+
async ({ redisOptions }) => {
85+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
86+
87+
const manager = new VisibilityManager({
88+
redis: redisOptions,
89+
keys,
90+
shardCount: 1,
91+
defaultTimeoutMs: 5000,
92+
});
93+
94+
const redis = createRedisClient(redisOptions);
95+
const queueId = "tenant:t1:queue:heartbeat-after-complete";
96+
const queueKey = keys.queueKey(queueId);
97+
const queueItemsKey = keys.queueItemsKey(queueId);
98+
99+
// Add and claim a message
100+
const messageId = "completed-msg";
101+
const storedMessage = {
102+
id: messageId,
103+
queueId,
104+
tenantId: "t1",
105+
payload: { id: 1, value: "test" },
106+
timestamp: Date.now() - 1000,
107+
attempt: 1,
108+
};
109+
110+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
111+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
112+
113+
const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 5000);
114+
expect(claimResult.claimed).toBe(true);
115+
116+
// Heartbeat should work before complete
117+
const heartbeatBefore = await manager.heartbeat(messageId, queueId, 5000);
118+
expect(heartbeatBefore).toBe(true);
119+
120+
// Complete the message
121+
await manager.complete(messageId, queueId);
122+
123+
// Heartbeat should fail after complete
124+
const heartbeatAfter = await manager.heartbeat(messageId, queueId, 5000);
125+
expect(heartbeatAfter).toBe(false);
126+
127+
await manager.close();
128+
await redis.quit();
129+
}
130+
);
131+
132+
redisTest(
133+
"should correctly update the deadline score",
134+
{ timeout: 10000 },
135+
async ({ redisOptions }) => {
136+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
137+
138+
const manager = new VisibilityManager({
139+
redis: redisOptions,
140+
keys,
141+
shardCount: 1,
142+
defaultTimeoutMs: 1000,
143+
});
144+
145+
const redis = createRedisClient(redisOptions);
146+
const queueId = "tenant:t1:queue:heartbeat-deadline";
147+
const queueKey = keys.queueKey(queueId);
148+
const queueItemsKey = keys.queueItemsKey(queueId);
149+
150+
// Add and claim a message with short timeout
151+
const messageId = "deadline-test-msg";
152+
const storedMessage = {
153+
id: messageId,
154+
queueId,
155+
tenantId: "t1",
156+
payload: { id: 1, value: "test" },
157+
timestamp: Date.now() - 1000,
158+
attempt: 1,
159+
};
160+
161+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
162+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
163+
164+
// Claim with 1 second timeout
165+
await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 1000);
166+
167+
// Get initial deadline
168+
const inflightKey = keys.inflightKey(0);
169+
const member = `${messageId}:${queueId}`;
170+
const initialScore = await redis.zscore(inflightKey, member);
171+
expect(initialScore).not.toBeNull();
172+
173+
// Wait a bit
174+
await new Promise((resolve) => setTimeout(resolve, 100));
175+
176+
// Extend deadline by 10 seconds
177+
const beforeHeartbeat = Date.now();
178+
const heartbeatSuccess = await manager.heartbeat(messageId, queueId, 10000);
179+
expect(heartbeatSuccess).toBe(true);
180+
181+
// Check that deadline was extended
182+
const newScore = await redis.zscore(inflightKey, member);
183+
expect(newScore).not.toBeNull();
184+
185+
// New deadline should be approximately now + 10 seconds
186+
const newDeadline = parseFloat(newScore!);
187+
expect(newDeadline).toBeGreaterThan(parseFloat(initialScore!));
188+
expect(newDeadline).toBeGreaterThanOrEqual(beforeHeartbeat + 10000);
189+
// Allow some tolerance for execution time
190+
expect(newDeadline).toBeLessThan(beforeHeartbeat + 11000);
191+
192+
await manager.close();
193+
await redis.quit();
194+
}
195+
);
196+
197+
redisTest(
198+
"should handle multiple consecutive heartbeats",
199+
{ timeout: 10000 },
200+
async ({ redisOptions }) => {
201+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
202+
203+
const manager = new VisibilityManager({
204+
redis: redisOptions,
205+
keys,
206+
shardCount: 1,
207+
defaultTimeoutMs: 1000,
208+
});
209+
210+
const redis = createRedisClient(redisOptions);
211+
const queueId = "tenant:t1:queue:multi-heartbeat";
212+
const queueKey = keys.queueKey(queueId);
213+
const queueItemsKey = keys.queueItemsKey(queueId);
214+
215+
// Add and claim a message
216+
const messageId = "multi-heartbeat-msg";
217+
const storedMessage = {
218+
id: messageId,
219+
queueId,
220+
tenantId: "t1",
221+
payload: { id: 1, value: "test" },
222+
timestamp: Date.now() - 1000,
223+
attempt: 1,
224+
};
225+
226+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
227+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
228+
229+
await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 1000);
230+
231+
// Multiple heartbeats should all succeed
232+
for (let i = 0; i < 5; i++) {
233+
const result = await manager.heartbeat(messageId, queueId, 1000);
234+
expect(result).toBe(true);
235+
await new Promise((resolve) => setTimeout(resolve, 50));
236+
}
237+
238+
// Message should still be in-flight
239+
const inflightCount = await manager.getTotalInflightCount();
240+
expect(inflightCount).toBe(1);
241+
242+
await manager.close();
243+
await redis.quit();
244+
}
245+
);
246+
});
247+
});
248+

packages/redis-worker/src/fair-queue/visibility.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,15 @@ export class VisibilityManager {
141141
const member = this.#makeMember(messageId, queueId);
142142
const newDeadline = Date.now() + extendMs;
143143

144-
// Update the score (deadline) in the in-flight set
145-
// Only update if the message is still in the set
146-
const result = await this.redis.zadd(inflightKey, "XX", newDeadline, member);
144+
// Use Lua script to atomically check existence and update score
145+
// ZADD XX returns 0 even on successful updates, so we use a custom command
146+
const result = await this.redis.heartbeatMessage(
147+
inflightKey,
148+
member,
149+
newDeadline.toString()
150+
);
147151

148-
const success = result !== 0;
152+
const success = result === 1;
149153

150154
if (success) {
151155
this.logger.debug("Heartbeat successful", {
@@ -455,6 +459,28 @@ redis.call('HDEL', inflightDataKey, messageId)
455459
redis.call('ZADD', queueKey, score, messageId)
456460
redis.call('HSET', queueItemsKey, messageId, payload)
457461
462+
return 1
463+
`,
464+
});
465+
466+
// Atomic heartbeat: check if member exists and update score
467+
// ZADD XX returns 0 even on successful updates (it counts new additions only)
468+
// So we need to check existence first with ZSCORE
469+
this.redis.defineCommand("heartbeatMessage", {
470+
numberOfKeys: 1,
471+
lua: `
472+
local inflightKey = KEYS[1]
473+
local member = ARGV[1]
474+
local newDeadline = tonumber(ARGV[2])
475+
476+
-- Check if member exists in the in-flight set
477+
local score = redis.call('ZSCORE', inflightKey, member)
478+
if not score then
479+
return 0
480+
end
481+
482+
-- Update the deadline
483+
redis.call('ZADD', inflightKey, 'XX', newDeadline, member)
458484
return 1
459485
`,
460486
});
@@ -483,5 +509,7 @@ declare module "@internal/redis" {
483509
messageId: string,
484510
score: string
485511
): Promise<number>;
512+
513+
heartbeatMessage(inflightKey: string, member: string, newDeadline: string): Promise<number>;
486514
}
487515
}

0 commit comments

Comments
 (0)