Skip to content

Commit f7cb637

Browse files
authored
fix(streams): scope s2 access token to environment and fix streams v1 appends (#2670)
* fix(streams): scope s2 access token to environment and fix streams v1 appends * Less stale time
1 parent 668559e commit f7cb637

File tree

5 files changed

+90
-13
lines changed

5 files changed

+90
-13
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,6 +1226,10 @@ const EnvironmentSchema = z
12261226

12271227
REALTIME_STREAMS_S2_BASIN: z.string().optional(),
12281228
REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(),
1229+
REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS: z.coerce
1230+
.number()
1231+
.int()
1232+
.default(60_000 * 60 * 24), // 1 day
12291233
REALTIME_STREAMS_S2_LOG_LEVEL: z
12301234
.enum(["log", "error", "warn", "info", "debug"])
12311235
.default("info"),

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
381381
"chunkIndex",
382382
"0",
383383
"data",
384-
part
384+
JSON.stringify(part) + "\n"
385385
);
386386

387387
// Set TTL for cleanup when stream is done

apps/webapp/app/services/realtime/s2realtimeStreams.server.ts

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// app/realtime/S2RealtimeStreams.ts
2+
import type { UnkeyCache } from "@internal/cache";
23
import { StreamIngestor, StreamResponder, StreamResponseOptions } from "./types";
34
import { Logger, LogLevel } from "@trigger.dev/core/logger";
45
import { randomUUID } from "node:crypto";
@@ -17,6 +18,12 @@ export type S2RealtimeStreamsOptions = {
1718

1819
logger?: Logger;
1920
logLevel?: LogLevel;
21+
22+
accessTokenExpirationInMs?: number;
23+
24+
cache?: UnkeyCache<{
25+
accessToken: string;
26+
}>;
2027
};
2128

2229
type S2IssueAccessTokenResponse = { access_token: string };
@@ -41,6 +48,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
4148
private readonly logger: Logger;
4249
private readonly level: LogLevel;
4350

51+
private readonly accessTokenExpirationInMs: number;
52+
53+
private readonly cache?: UnkeyCache<{
54+
accessToken: string;
55+
}>;
56+
4457
constructor(opts: S2RealtimeStreamsOptions) {
4558
this.basin = opts.basin;
4659
this.baseUrl = `https://${this.basin}.b.aws.s2.dev/v1`;
@@ -54,14 +67,13 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
5467

5568
this.logger = opts.logger ?? new Logger("S2RealtimeStreams", opts.logLevel ?? "info");
5669
this.level = opts.logLevel ?? "info";
57-
}
5870

59-
private toStreamName(runId: string, streamId: string): string {
60-
return `${this.toStreamPrefix(runId)}${streamId}`;
71+
this.cache = opts.cache;
72+
this.accessTokenExpirationInMs = opts.accessTokenExpirationInMs ?? 60_000 * 60 * 24; // 1 day
6173
}
6274

63-
private toStreamPrefix(runId: string): string {
64-
return `${this.streamPrefix}/runs/${runId}/`;
75+
private toStreamName(runId: string, streamId: string): string {
76+
return `${this.streamPrefix}/runs/${runId}/${streamId}`;
6577
}
6678

6779
async initializeStream(
@@ -70,11 +82,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
7082
): Promise<{ responseHeaders?: Record<string, string> }> {
7183
const id = randomUUID();
7284

73-
const accessToken = await this.s2IssueAccessToken(id, runId, streamId);
85+
const accessToken = await this.getS2AccessToken(id);
7486

7587
return {
7688
responseHeaders: {
7789
"X-S2-Access-Token": accessToken,
90+
"X-S2-Stream-Name": `/runs/${runId}/${streamId}`,
7891
"X-S2-Basin": this.basin,
7992
"X-S2-Flush-Interval-Ms": this.flushIntervalMs.toString(),
8093
"X-S2-Max-Retries": this.maxRetries.toString(),
@@ -153,7 +166,23 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
153166
return (await res.json()) as S2AppendAck;
154167
}
155168

156-
private async s2IssueAccessToken(id: string, runId: string, streamId: string): Promise<string> {
169+
private async getS2AccessToken(id: string): Promise<string> {
170+
if (!this.cache) {
171+
return this.s2IssueAccessToken(id);
172+
}
173+
174+
const result = await this.cache.accessToken.swr(this.streamPrefix, async () => {
175+
return this.s2IssueAccessToken(id);
176+
});
177+
178+
if (!result.val) {
179+
throw new Error("Failed to get S2 access token");
180+
}
181+
182+
return result.val;
183+
}
184+
185+
private async s2IssueAccessToken(id: string): Promise<string> {
157186
// POST /v1/access-tokens
158187
const res = await fetch(`https://aws.s2.dev/v1/access-tokens`, {
159188
method: "POST",
@@ -169,10 +198,10 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
169198
},
170199
ops: ["append", "create-stream"],
171200
streams: {
172-
prefix: this.toStreamPrefix(runId),
201+
prefix: this.streamPrefix,
173202
},
174203
},
175-
expires_at: new Date(Date.now() + 1000 * 60 * 60 * 24).toISOString(), // 1 day
204+
expires_at: new Date(Date.now() + this.accessTokenExpirationInMs).toISOString(),
176205
auto_prefix_streams: true,
177206
}),
178207
});

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1+
import {
2+
createCache,
3+
createMemoryStore,
4+
DefaultStatefulContext,
5+
Namespace,
6+
RedisCacheStore,
7+
} from "@internal/cache";
18
import { env } from "~/env.server";
29
import { singleton } from "~/utils/singleton";
3-
import { RedisRealtimeStreams } from "./redisRealtimeStreams.server";
410
import { AuthenticatedEnvironment } from "../apiAuth.server";
5-
import { StreamIngestor, StreamResponder } from "./types";
11+
import { RedisRealtimeStreams } from "./redisRealtimeStreams.server";
612
import { S2RealtimeStreams } from "./s2realtimeStreams.server";
13+
import { StreamIngestor, StreamResponder } from "./types";
714

815
function initializeRedisRealtimeStreams() {
916
return new RedisRealtimeStreams({
@@ -44,9 +51,43 @@ export function getRealtimeStreamInstance(
4451
flushIntervalMs: env.REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS,
4552
maxRetries: env.REALTIME_STREAMS_S2_MAX_RETRIES,
4653
s2WaitSeconds: env.REALTIME_STREAMS_S2_WAIT_SECONDS,
54+
accessTokenExpirationInMs: env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS,
55+
cache: s2RealtimeStreamsCache,
4756
});
4857
}
4958

5059
throw new Error("Realtime streams v2 is required for this run but S2 configuration is missing");
5160
}
5261
}
62+
63+
const s2RealtimeStreamsCache = singleton(
64+
"s2RealtimeStreamsCache",
65+
initializeS2RealtimeStreamsCache
66+
);
67+
68+
function initializeS2RealtimeStreamsCache() {
69+
const ctx = new DefaultStatefulContext();
70+
const redisCacheStore = new RedisCacheStore({
71+
name: "s2-realtime-streams-cache",
72+
connection: {
73+
port: env.REALTIME_STREAMS_REDIS_PORT,
74+
host: env.REALTIME_STREAMS_REDIS_HOST,
75+
username: env.REALTIME_STREAMS_REDIS_USERNAME,
76+
password: env.REALTIME_STREAMS_REDIS_PASSWORD,
77+
enableAutoPipelining: true,
78+
...(env.REALTIME_STREAMS_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
79+
keyPrefix: "s2-realtime-streams-cache:",
80+
},
81+
useModernCacheKeyBuilder: true,
82+
});
83+
84+
const memoryStore = createMemoryStore(5000, 0.001);
85+
86+
return createCache({
87+
accessToken: new Namespace<string>(ctx, {
88+
stores: [memoryStore, redisCacheStore],
89+
fresh: Math.floor(env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS / 2),
90+
stale: Math.floor(env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS / 2 + 60_000),
91+
}),
92+
});
93+
}

packages/core/src/v3/realtimeStreams/streamInstance.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export class StreamInstance<T> implements StreamsWriter {
5050
})
5151
: new StreamsWriterV2({
5252
basin: parsedResponse.basin,
53-
stream: this.options.key,
53+
stream: parsedResponse.streamName ?? this.options.key,
5454
accessToken: parsedResponse.accessToken,
5555
source: this.options.source,
5656
signal: this.options.signal,
@@ -105,6 +105,7 @@ type ParsedStreamResponse =
105105
basin: string;
106106
flushIntervalMs?: number;
107107
maxRetries?: number;
108+
streamName?: string;
108109
};
109110

110111
function parseCreateStreamResponse(
@@ -124,13 +125,15 @@ function parseCreateStreamResponse(
124125

125126
const flushIntervalMs = headers?.["x-s2-flush-interval-ms"];
126127
const maxRetries = headers?.["x-s2-max-retries"];
128+
const streamName = headers?.["x-s2-stream-name"];
127129

128130
return {
129131
version: "v2",
130132
accessToken,
131133
basin,
132134
flushIntervalMs: flushIntervalMs ? parseInt(flushIntervalMs) : undefined,
133135
maxRetries: maxRetries ? parseInt(maxRetries) : undefined,
136+
streamName,
134137
};
135138
}
136139

0 commit comments

Comments
 (0)