From a8315f4ce8e7574dc8abf5c6826f787e233c418e Mon Sep 17 00:00:00 2001 From: charlieww Date: Fri, 27 Mar 2026 00:50:21 +0800 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20feat:=20Implement=20WebSocket=20ser?= =?UTF-8?q?ver=20endpoint=20=E2=80=94=20src/types/metrics.ts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/types/metrics.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 src/types/metrics.ts diff --git a/src/types/metrics.ts b/src/types/metrics.ts new file mode 100644 index 0000000..fa864c0 --- /dev/null +++ b/src/types/metrics.ts @@ -0,0 +1,19 @@ +export interface MetricData { + value: number + labels: Record + metadata: Record +} + +// Messages clients send to the server +export type ClientMessage = + | { type: 'subscribe'; stream: string } + | { type: 'unsubscribe'; stream: string } + | { type: 'ping' } + +// Messages the server sends to clients +export type ServerMessage = + | { type: 'update'; stream: string; data: MetricData; timestamp: string } + | { type: 'subscribed'; stream: string; subscriber_count: number } + | { type: 'unsubscribed'; stream: string; subscriber_count: number } + | { type: 'pong' } + | { type: 'error'; message: string; code: string } From 264d9b4b1efd47693866788e70e6708807e2eb10 Mon Sep 17 00:00:00 2001 From: charlieww Date: Fri, 27 Mar 2026 00:50:22 +0800 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20feat:=20Implement=20WebSocket=20ser?= =?UTF-8?q?ver=20endpoint=20=E2=80=94=20src/ws-server.ts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ws-server.ts | 284 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) create mode 100644 src/ws-server.ts diff --git a/src/ws-server.ts b/src/ws-server.ts new file mode 100644 index 0000000..612ecca --- /dev/null +++ b/src/ws-server.ts @@ -0,0 +1,284 @@ +import { createServer, type AddressInfo } from 'node:http' +import { WebSocketServer, WebSocket, type RawData } from 'ws' +import type { NatsConnection } from 'nats' +import type { ClientMessage, MetricData, ServerMessage } from './types/metrics.js' + +// Idle-connection pruning interval — must be shorter than any upstream LB timeout +const HEARTBEAT_INTERVAL_MS = 30_000 +// Guard against clients that send oversized payloads to inflate heap usage +const MAX_MESSAGE_BYTES = 64 * 1024 // 64 KB + +// WebSocket augmented with per-connection tracking state +interface TrackedClient extends WebSocket { + isAlive: boolean + subscriptions: Set +} + +/** + * Pure subscription/broadcast logic with no network I/O. + * Kept separate so it can be unit-tested without spinning up a real server. + */ +export class MetricBroadcaster { + // stream name → set of subscribed clients + private readonly streams = new Map>() + + subscribe(client: TrackedClient, stream: string): void { + if (!this.streams.has(stream)) { + this.streams.set(stream, new Set()) + } + this.streams.get(stream)!.add(client) + client.subscriptions.add(stream) + } + + unsubscribe(client: TrackedClient, stream: string): void { + const subscribers = this.streams.get(stream) + if (subscribers) { + subscribers.delete(client) + // Remove empty buckets to avoid unbounded map growth + if (subscribers.size === 0) this.streams.delete(stream) + } + client.subscriptions.delete(stream) + } + + /** Remove a client from every stream it joined — call on disconnect. */ + removeClient(client: TrackedClient): void { + for (const stream of client.subscriptions) { + const subscribers = this.streams.get(stream) + if (subscribers) { + subscribers.delete(client) + if (subscribers.size === 0) this.streams.delete(stream) + } + } + client.subscriptions.clear() + } + + /** + * Fan out a metric update to all OPEN subscribers of the stream. + * Returns the number of clients that were sent the message. + */ + broadcast(stream: string, data: MetricData): number { + const subscribers = this.streams.get(stream) + if (!subscribers || subscribers.size === 0) return 0 + + const message: ServerMessage = { + type: 'update', + stream, + data, + timestamp: new Date().toISOString(), + } + const payload = JSON.stringify(message) + let sent = 0 + + for (const client of subscribers) { + if (client.readyState === WebSocket.OPEN) { + client.send(payload) + sent++ + } + } + return sent + } + + subscriberCount(stream: string): number { + return this.streams.get(stream)?.size ?? 0 + } + + streamCount(): number { + return this.streams.size + } +} + +/** WebSocket server that manages per-stream metric subscriptions and broadcasts. */ +export class MetricWebSocketServer { + private readonly broadcaster = new MetricBroadcaster() + private readonly httpServer = createServer() + private readonly wss = new WebSocketServer({ server: this.httpServer }) + private heartbeatTimer: ReturnType | null = null + + constructor(private readonly nc?: NatsConnection) { + this.wss.on('connection', (socket, req) => + this.handleConnection(socket as TrackedClient, req) + ) + } + + /** Bind to the given port. Resolves once the socket is ready to accept connections. */ + listen(port: number): Promise { + return new Promise((resolve, reject) => { + this.httpServer.once('error', reject) + this.httpServer.listen(port, () => { + this.httpServer.off('error', reject) + this.startHeartbeat() + if (this.nc) this.attachNats(this.nc) + resolve() + }) + }) + } + + /** Gracefully close all connections, then shut down the underlying HTTP server. */ + close(): Promise { + if (this.heartbeatTimer !== null) { + clearInterval(this.heartbeatTimer) + this.heartbeatTimer = null + } + + // Send a standard going-away close frame before the server drops the sockets + for (const raw of this.wss.clients) { + raw.close(1001, 'Server shutting down') + } + + return new Promise((resolve, reject) => + this.wss.close(err => (err ? reject(err) : resolve())) + ) + } + + /** Address of the bound server — only valid after listen() resolves. */ + address(): AddressInfo { + return this.httpServer.address() as AddressInfo + } + + /** Push a metric update directly (e.g. forwarded from a NATS message). */ + broadcast(stream: string, data: MetricData): number { + return this.broadcaster.broadcast(stream, data) + } + + private handleConnection( + client: TrackedClient, + req: import('node:http').IncomingMessage, + ): void { + client.isAlive = true + client.subscriptions = new Set() + + const remote = req.socket.remoteAddress ?? 'unknown' + console.log(`[ws-server] Connected: ${remote}`) + + // Reset liveness flag on each pong so the heartbeat knows the client is alive + client.on('pong', () => { client.isAlive = true }) + + client.on('message', (raw: RawData) => { + // Normalise to a single Buffer regardless of the underlying framing + const buf = Buffer.isBuffer(raw) + ? raw + : Buffer.from(raw instanceof ArrayBuffer ? raw : Buffer.concat(raw as Buffer[])) + + if (buf.length > MAX_MESSAGE_BYTES) { + this.send(client, { type: 'error', message: 'Message exceeds 64 KB limit', code: 'MSG_TOO_LARGE' }) + return + } + + this.handleMessage(client, buf.toString('utf8')) + }) + + client.on('close', (code, reason) => { + console.log(`[ws-server] Disconnected: ${remote} (${code} ${reason.toString()})`) + this.broadcaster.removeClient(client) + }) + + // Socket errors are non-fatal — 'close' fires immediately after and handles cleanup + client.on('error', (err: Error) => { + console.error(`[ws-server] Socket error (${remote}):`, err.message) + }) + } + + private handleMessage(client: TrackedClient, raw: string): void { + let msg: ClientMessage + try { + msg = JSON.parse(raw) as ClientMessage + } catch { + this.send(client, { type: 'error', message: 'Invalid JSON payload', code: 'PARSE_ERROR' }) + return + } + + switch (msg.type) { + case 'subscribe': { + if (!isValidStream(msg.stream)) { + this.send(client, { + type: 'error', + message: `Invalid stream name: "${String(msg.stream)}"`, + code: 'INVALID_STREAM', + }) + return + } + this.broadcaster.subscribe(client, msg.stream) + this.send(client, { + type: 'subscribed', + stream: msg.stream, + subscriber_count: this.broadcaster.subscriberCount(msg.stream), + }) + break + } + + case 'unsubscribe': { + this.broadcaster.unsubscribe(client, msg.stream) + this.send(client, { + type: 'unsubscribed', + stream: msg.stream, + subscriber_count: this.broadcaster.subscriberCount(msg.stream), + }) + break + } + + case 'ping': { + this.send(client, { type: 'pong' }) + break + } + + default: { + // Safe cast — msg.type is `never` here statically but can be any string at runtime + const unknownType = (msg as unknown as { type: string }).type + this.send(client, { + type: 'error', + message: `Unknown message type: "${unknownType}"`, + code: 'UNKNOWN_TYPE', + }) + } + } + } + + private send(client: TrackedClient, msg: ServerMessage): void { + if (client.readyState === WebSocket.OPEN) { + client.send(JSON.stringify(msg)) + } + } + + // Ping every connected client; terminate any that have not responded since the last ping. + // This detects half-open TCP connections that will never produce a 'close' event. + private startHeartbeat(): void { + this.heartbeatTimer = setInterval(() => { + for (const raw of this.wss.clients) { + const client = raw as TrackedClient + if (!client.isAlive) { + this.broadcaster.removeClient(client) + client.terminate() + continue + } + client.isAlive = false + client.ping() + } + }, HEARTBEAT_INTERVAL_MS) + + // Unref so the timer does not keep the process alive during tests or clean shutdown + this.heartbeatTimer.unref() + } + + // Subscribe to the NATS wildcard subject and fan out updates to WebSocket subscribers. + // Subject format: dev.metrics. + private attachNats(nc: NatsConnection): void { + const sub = nc.subscribe('dev.metrics.>') + ;(async () => { + for await (const msg of sub) { + try { + const data = JSON.parse(new TextDecoder().decode(msg.data)) as MetricData + const stream = msg.subject.slice('dev.metrics.'.length) + this.broadcaster.broadcast(stream, data) + } catch (err) { + console.error('[ws-server] Failed to parse NATS metric message:', err) + } + } + })() + } +} + +// Stream names must begin with an alphanumeric character, then allow dots/hyphens/underscores, +// and be capped at 128 characters. Rejects path-traversal attempts like "../../etc/passwd". +function isValidStream(stream: unknown): stream is string { + return typeof stream === 'string' && /^[a-zA-Z0-9][a-zA-Z0-9._-]{0,127}$/.test(stream) +} From 2ec48f9fa46d3c04426f8b218fa8e95440fd0de0 Mon Sep 17 00:00:00 2001 From: charlieww Date: Fri, 27 Mar 2026 00:50:24 +0800 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20feat:=20Implement=20WebSocket=20ser?= =?UTF-8?q?ver=20endpoint=20=E2=80=94=20package.json?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/package.json b/package.json index 070b007..eaf4e60 100644 --- a/package.json +++ b/package.json @@ -1,19 +1,30 @@ { - "name": "demo-api-backend", + "name": "@opendev/agent-runtime", "version": "0.1.0", + "private": true, + "type": "module", "scripts": { - "dev": "ts-node src/index.ts", + "dev": "tsx watch src/index.ts", "build": "tsc", - "test": "jest" + "start": "node dist/index.js", + "lint": "tsc --noEmit", + "clean": "rm -rf dist .turbo", + "test": "vitest run", + "test:watch": "vitest" }, "dependencies": { - "express": "^4.18.0" + "@anthropic-ai/sdk": "^0.24.0", + "nats": "^2.18.0", + "pg": "^8.11.3", + "ws": "^8.17.0", + "zod": "^3.22.4" }, "devDependencies": { - "@types/express": "^4.17.0", "@types/node": "^20.0.0", - "typescript": "^5.0.0", - "ts-node": "^10.9.0", - "jest": "^29.0.0" + "@types/pg": "^8.11.3", + "@types/ws": "^8.5.10", + "tsx": "^4.7.1", + "typescript": "^5.4.0", + "vitest": "^1.6.1" } } From 2ba71241ba44f6d7accf8a5e53c2f041e0e3a857 Mon Sep 17 00:00:00 2001 From: charlieww Date: Fri, 27 Mar 2026 00:50:27 +0800 Subject: [PATCH 4/4] =?UTF-8?q?feat:=20feat:=20Implement=20WebSocket=20ser?= =?UTF-8?q?ver=20endpoint=20=E2=80=94=20src/ws-server.test.ts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ws-server.test.ts | 321 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 321 insertions(+) create mode 100644 src/ws-server.test.ts diff --git a/src/ws-server.test.ts b/src/ws-server.test.ts new file mode 100644 index 0000000..57acc8f --- /dev/null +++ b/src/ws-server.test.ts @@ -0,0 +1,321 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest' +import { WebSocket as WsClient } from 'ws' +import { MetricBroadcaster, MetricWebSocketServer } from './ws-server.js' +import type { MetricData, ServerMessage } from './types/metrics.js' + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +function mockClient(overrides: { readyState?: number } = {}) { + const sent: string[] = [] + return { + isAlive: true, + subscriptions: new Set(), + readyState: overrides.readyState ?? 1, // 1 = WebSocket.OPEN + send(payload: string) { sent.push(payload) }, + _sent: sent, + } +} + +type MockClient = ReturnType + +function connectClient(port: number): Promise { + return new Promise((resolve, reject) => { + const ws = new WsClient(`ws://127.0.0.1:${port}`) + ws.once('open', () => resolve(ws)) + ws.once('error', (err) => reject(err)) + }) +} + +function nextMessage(ws: WsClient): Promise { + return new Promise((resolve, reject) => { + ws.once('message', (data) => { + try { resolve(JSON.parse(data.toString()) as ServerMessage) } + catch (e) { reject(e) } + }) + ws.once('error', reject) + }) +} + +function closeClient(ws: WsClient): Promise { + return new Promise(resolve => { + if (ws.readyState === WsClient.CLOSED) return resolve() + ws.once('close', resolve) + ws.close() + }) +} + +const SAMPLE_METRIC: MetricData = { + value: 88.5, + labels: { host: 'web-1' }, + metadata: { unit: 'percent' }, +} + +// ── MetricBroadcaster unit tests ────────────────────────────────────────────── + +describe('MetricBroadcaster', () => { + let broadcaster: MetricBroadcaster + let clientA: MockClient + let clientB: MockClient + + beforeEach(() => { + broadcaster = new MetricBroadcaster() + clientA = mockClient() + clientB = mockClient() + }) + + describe('subscribe', () => { + it('should add client to the stream subscriber set', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + expect(broadcaster.subscriberCount('cpu.usage')).toBe(1) + expect(clientA.subscriptions.has('cpu.usage')).toBe(true) + }) + + it('should allow multiple clients to subscribe to the same stream', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + broadcaster.subscribe(clientB as any, 'cpu.usage') + expect(broadcaster.subscriberCount('cpu.usage')).toBe(2) + }) + + it('should allow one client to subscribe to multiple streams', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + broadcaster.subscribe(clientA as any, 'mem.usage') + expect(clientA.subscriptions.size).toBe(2) + expect(broadcaster.streamCount()).toBe(2) + }) + }) + + describe('unsubscribe', () => { + it('should remove client from the stream and decrement subscriber count', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + broadcaster.unsubscribe(clientA as any, 'cpu.usage') + expect(broadcaster.subscriberCount('cpu.usage')).toBe(0) + }) + + it('should remove an empty stream from the internal map', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + broadcaster.unsubscribe(clientA as any, 'cpu.usage') + expect(broadcaster.streamCount()).toBe(0) + }) + + it('should be idempotent when the client is not subscribed', () => { + expect(() => broadcaster.unsubscribe(clientA as any, 'nonexistent')).not.toThrow() + }) + }) + + describe('removeClient', () => { + it('should remove the client from all subscribed streams', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + broadcaster.subscribe(clientA as any, 'mem.usage') + broadcaster.removeClient(clientA as any) + expect(broadcaster.subscriberCount('cpu.usage')).toBe(0) + expect(broadcaster.subscriberCount('mem.usage')).toBe(0) + expect(broadcaster.streamCount()).toBe(0) + }) + + it('should not affect other subscribers on shared streams', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + broadcaster.subscribe(clientB as any, 'cpu.usage') + broadcaster.removeClient(clientA as any) + expect(broadcaster.subscriberCount('cpu.usage')).toBe(1) + }) + }) + + describe('broadcast', () => { + it('should deliver the update to all OPEN subscribers', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + broadcaster.subscribe(clientB as any, 'cpu.usage') + const sent = broadcaster.broadcast('cpu.usage', SAMPLE_METRIC) + expect(sent).toBe(2) + expect(clientA._sent).toHaveLength(1) + expect(clientB._sent).toHaveLength(1) + }) + + it('should include type, stream, data, and an ISO timestamp in the payload', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + broadcaster.broadcast('cpu.usage', SAMPLE_METRIC) + const msg = JSON.parse(clientA._sent[0]) as ServerMessage + expect(msg.type).toBe('update') + if (msg.type === 'update') { + expect(msg.stream).toBe('cpu.usage') + expect(msg.data).toEqual(SAMPLE_METRIC) + expect(msg.timestamp).toMatch(/^\d{4}-\d{2}-\d{2}T/) + } + }) + + it('should not deliver to clients subscribed to a different stream', () => { + broadcaster.subscribe(clientA as any, 'cpu.usage') + broadcaster.subscribe(clientB as any, 'mem.usage') + broadcaster.broadcast('cpu.usage', SAMPLE_METRIC) + expect(clientB._sent).toHaveLength(0) + }) + + it('should skip clients whose readyState is not OPEN', () => { + const closedClient = mockClient({ readyState: 3 }) // 3 = CLOSED + broadcaster.subscribe(closedClient as any, 'cpu.usage') + expect(broadcaster.broadcast('cpu.usage', SAMPLE_METRIC)).toBe(0) + }) + + it('should return 0 when no subscribers exist for the stream', () => { + expect(broadcaster.broadcast('no.one.here', SAMPLE_METRIC)).toBe(0) + }) + }) +}) + +// ── MetricWebSocketServer integration tests ─────────────────────────────────── + +describe('MetricWebSocketServer', () => { + let server: MetricWebSocketServer + let port: number + + beforeEach(async () => { + server = new MetricWebSocketServer() // no NATS in tests + await server.listen(0) // 0 = OS-assigned port avoids conflicts + port = server.address().port + }) + + afterEach(async () => { + await server.close() + }) + + it('should accept WebSocket connections', async () => { + const ws = await connectClient(port) + expect(ws.readyState).toBe(WsClient.OPEN) + await closeClient(ws) + }) + + it('should acknowledge a subscribe message with subscriber_count', async () => { + const ws = await connectClient(port) + ws.send(JSON.stringify({ type: 'subscribe', stream: 'cpu.usage' })) + const msg = await nextMessage(ws) + expect(msg.type).toBe('subscribed') + if (msg.type === 'subscribed') { + expect(msg.stream).toBe('cpu.usage') + expect(msg.subscriber_count).toBe(1) + } + await closeClient(ws) + }) + + it('should deliver broadcast updates to subscribed clients', async () => { + const ws = await connectClient(port) + ws.send(JSON.stringify({ type: 'subscribe', stream: 'cpu.usage' })) + await nextMessage(ws) // consume the subscribed ack + + server.broadcast('cpu.usage', SAMPLE_METRIC) + + const update = await nextMessage(ws) + expect(update.type).toBe('update') + if (update.type === 'update') { + expect(update.stream).toBe('cpu.usage') + expect(update.data).toEqual(SAMPLE_METRIC) + } + await closeClient(ws) + }) + + it('should not deliver updates for unsubscribed streams', async () => { + const ws = await connectClient(port) + ws.send(JSON.stringify({ type: 'subscribe', stream: 'cpu.usage' })) + await nextMessage(ws) // subscribed ack + + server.broadcast('mem.usage', SAMPLE_METRIC) // different stream + + const result = await Promise.race([ + nextMessage(ws), + new Promise(r => setTimeout(() => r(null), 150)), + ]) + expect(result).toBeNull() + await closeClient(ws) + }) + + it('should stop delivering updates after unsubscribe', async () => { + const ws = await connectClient(port) + ws.send(JSON.stringify({ type: 'subscribe', stream: 'cpu.usage' })) + await nextMessage(ws) + + ws.send(JSON.stringify({ type: 'unsubscribe', stream: 'cpu.usage' })) + const unsubMsg = await nextMessage(ws) + expect(unsubMsg.type).toBe('unsubscribed') + if (unsubMsg.type === 'unsubscribed') expect(unsubMsg.subscriber_count).toBe(0) + + server.broadcast('cpu.usage', SAMPLE_METRIC) + const result = await Promise.race([ + nextMessage(ws), + new Promise(r => setTimeout(() => r(null), 150)), + ]) + expect(result).toBeNull() + await closeClient(ws) + }) + + it('should respond to ping with pong', async () => { + const ws = await connectClient(port) + ws.send(JSON.stringify({ type: 'ping' })) + const msg = await nextMessage(ws) + expect(msg.type).toBe('pong') + await closeClient(ws) + }) + + it('should return PARSE_ERROR for invalid JSON', async () => { + const ws = await connectClient(port) + ws.send('not json at all') + const msg = await nextMessage(ws) + expect(msg.type).toBe('error') + if (msg.type === 'error') expect(msg.code).toBe('PARSE_ERROR') + await closeClient(ws) + }) + + it('should return UNKNOWN_TYPE for an unrecognised message type', async () => { + const ws = await connectClient(port) + ws.send(JSON.stringify({ type: 'teleport' })) + const msg = await nextMessage(ws) + expect(msg.type).toBe('error') + if (msg.type === 'error') expect(msg.code).toBe('UNKNOWN_TYPE') + await closeClient(ws) + }) + + it('should return INVALID_STREAM for a stream name containing path traversal', async () => { + const ws = await connectClient(port) + ws.send(JSON.stringify({ type: 'subscribe', stream: '../../etc/passwd' })) + const msg = await nextMessage(ws) + expect(msg.type).toBe('error') + if (msg.type === 'error') expect(msg.code).toBe('INVALID_STREAM') + await closeClient(ws) + }) + + it('should return MSG_TOO_LARGE for a payload exceeding 64 KB', async () => { + const ws = await connectClient(port) + ws.send(Buffer.alloc(65 * 1024, 'x')) + const msg = await nextMessage(ws) + expect(msg.type).toBe('error') + if (msg.type === 'error') expect(msg.code).toBe('MSG_TOO_LARGE') + await closeClient(ws) + }) + + it('should broadcast to multiple subscribers on the same stream', async () => { + const [ws1, ws2] = await Promise.all([connectClient(port), connectClient(port)]) + + ws1.send(JSON.stringify({ type: 'subscribe', stream: 'net.rx' })) + ws2.send(JSON.stringify({ type: 'subscribe', stream: 'net.rx' })) + await Promise.all([nextMessage(ws1), nextMessage(ws2)]) // subscribed acks + + const sent = server.broadcast('net.rx', SAMPLE_METRIC) + expect(sent).toBe(2) + + const [m1, m2] = await Promise.all([nextMessage(ws1), nextMessage(ws2)]) + expect(m1.type).toBe('update') + expect(m2.type).toBe('update') + + await Promise.all([closeClient(ws1), closeClient(ws2)]) + }) + + it('should clean up subscriptions when a client disconnects', async () => { + const ws = await connectClient(port) + ws.send(JSON.stringify({ type: 'subscribe', stream: 'disk.io' })) + await nextMessage(ws) + + await closeClient(ws) + // Give the server one event-loop tick to process the close event + await new Promise(r => setTimeout(r, 20)) + + // A broadcast to the now-dead stream should reach 0 clients + expect(server.broadcast('disk.io', SAMPLE_METRIC)).toBe(0) + }) +})