diff --git a/mcpjam-inspector/HOSTED_MCPJAM.md b/mcpjam-inspector/HOSTED_MCPJAM.md new file mode 100644 index 000000000..ba776f5d8 --- /dev/null +++ b/mcpjam-inspector/HOSTED_MCPJAM.md @@ -0,0 +1,17 @@ +# Goal + +We want to create a hosted version of MCPJam. It will be hosted via Docker on Railway. It must have everything the Desktop version has. + +# Current limitations + +Currently MCPJam was designed to be a Desktop app ran locally either through `npx`, Electron, Docker. The Hono backend has a MCPClientManager singleton. This means if we host it, everyone will be sharing the same MCPClientManager object, and we will have collisions. People will have access to other people's MCP servers and see everyone else's logs. + +We cannot have this singleton behavior in a hosted version. Everyone should have their own MCPClientManager in isolation. + +# Requirements + +- We want to create a mono-repo that supports both the current local desktop, but also a hosted version. +- In hosted environment, everyone must have their own client manager in isolation. +- Try to scale, handle what happens when there are LOTS of people connected to the server. +- Must be deployable via Docker. +- Changes must be as minimalistic as possible. Least amount of impact and code required as possible. diff --git a/mcpjam-inspector/server/app.ts b/mcpjam-inspector/server/app.ts index 85aadbab0..c35fdda57 100644 --- a/mcpjam-inspector/server/app.ts +++ b/mcpjam-inspector/server/app.ts @@ -16,7 +16,12 @@ import { MCPClientManager } from "@mcpjam/sdk"; import { initElicitationCallback } from "./routes/mcp/elicitation.js"; import { rpcLogBus } from "./services/rpc-log-bus.js"; import { progressStore } from "./services/progress-store.js"; +import { + createClientManagerStore, + readSessionStoreOptionsFromEnv, +} from "./services/client-manager-store.js"; import { CORS_ORIGINS, HOSTED_MODE, ALLOWED_HOSTS } from "./config.js"; +import { resolveRequestSessionId } from "./middleware/client-manager-session.js"; import path from "path"; // Security imports @@ -82,52 +87,54 @@ export function createHonoApp() { generateSessionToken(); const app = new Hono(); - // Create the MCPJam client manager instance and wire RPC logging to SSE bus - const mcpClientManager = new MCPClientManager( - {}, - { - rpcLogger: ({ direction, message, serverId }) => { - rpcLogBus.publish({ - serverId, - direction, - timestamp: new Date().toISOString(), - message, - }); - }, - progressHandler: ({ - serverId, - progressToken, - progress, - total, - message, - }) => { - // Store progress for UI access using the real progressToken from the notification - progressStore.publish({ - serverId, - progressToken, - progress, - total, - message, - timestamp: new Date().toISOString(), - }); - }, - }, - ); + const mcpClientManagerStore = createClientManagerStore({ + hostedMode: HOSTED_MODE, + managerFactory: (sessionId) => { + // Create a manager and wire RPC/progress logging for each isolated session. + const manager = new MCPClientManager( + {}, + { + rpcLogger: ({ direction, message, serverId }) => { + rpcLogBus.publish({ + sessionId, + serverId, + direction, + timestamp: new Date().toISOString(), + message, + }); + }, + progressHandler: ({ + serverId, + progressToken, + progress, + total, + message, + }) => { + // Store progress for UI access using the real progressToken from the notification + progressStore.publish({ + sessionId, + serverId, + progressToken, + progress, + total, + message, + timestamp: new Date().toISOString(), + }); + }, + }, + ); - // Initialize elicitation callback immediately so tasks/result calls work - // without needing to hit the elicitation endpoints first - initElicitationCallback(mcpClientManager); + // Register callback per manager instance so task-related elicitation works. + initElicitationCallback(manager); + return manager; + }, + sessionStoreOptions: readSessionStoreOptionsFromEnv(), + }); if (process.env.DEBUG_MCP_SELECTION === "1") { appLogger.debug("[mcpjam][boot] DEBUG_MCP_SELECTION enabled"); } - // Middleware to inject the client manager into context - app.use("*", async (c, next) => { - c.mcpClientManager = mcpClientManager; - await next(); - }); - // ===== SECURITY MIDDLEWARE STACK ===== // Order matters: headers -> origin validation -> session auth @@ -142,6 +149,14 @@ export function createHonoApp() { // ===== END SECURITY MIDDLEWARE ===== + // Resolve the manager only after security middleware has run. + app.use("*", async (c, next) => { + const sessionId = resolveRequestSessionId(c, HOSTED_MODE); + c.mcpSessionId = sessionId; + c.mcpClientManager = mcpClientManagerStore.getManager(sessionId); + await next(); + }); + // Middleware - only enable HTTP request logging in dev mode or when --verbose is passed const enableHttpLogs = process.env.NODE_ENV !== "production" || diff --git a/mcpjam-inspector/server/index.ts b/mcpjam-inspector/server/index.ts index 3c6f77771..713d6c10a 100644 --- a/mcpjam-inspector/server/index.ts +++ b/mcpjam-inspector/server/index.ts @@ -22,6 +22,7 @@ import { sessionAuthMiddleware, scrubTokenFromUrl, } from "./middleware/session-auth"; +import { resolveRequestSessionId } from "./middleware/client-manager-session"; import { originValidationMiddleware } from "./middleware/origin-validation"; import { securityHeadersMiddleware } from "./middleware/security-headers"; @@ -82,8 +83,14 @@ function logBox(content: string, title?: string) { // Import routes and services import mcpRoutes from "./routes/mcp/index"; import appsRoutes from "./routes/apps/index"; +import { initElicitationCallback } from "./routes/mcp/elicitation"; import { rpcLogBus } from "./services/rpc-log-bus"; +import { progressStore } from "./services/progress-store"; import { tunnelManager } from "./services/tunnel-manager"; +import { + createClientManagerStore, + readSessionStoreOptionsFromEnv, +} from "./services/client-manager-store"; import { SERVER_PORT, SERVER_HOSTNAME, @@ -218,24 +225,45 @@ if (!process.env.CONVEX_HTTP_URL) { ); } -// Initialize centralized MCPJam Client Manager and wire RPC logging to SSE bus -const mcpClientManager = new MCPClientManager( - {}, - { - rpcLogger: ({ direction, message, serverId }) => { - rpcLogBus.publish({ - serverId, - direction, - timestamp: new Date().toISOString(), - message, - }); - }, +const mcpClientManagerStore = createClientManagerStore({ + hostedMode: HOSTED_MODE, + managerFactory: (sessionId) => { + const manager = new MCPClientManager( + {}, + { + rpcLogger: ({ direction, message, serverId }) => { + rpcLogBus.publish({ + sessionId, + serverId, + direction, + timestamp: new Date().toISOString(), + message, + }); + }, + progressHandler: ({ + serverId, + progressToken, + progress, + total, + message, + }) => { + progressStore.publish({ + sessionId, + serverId, + progressToken, + progress, + total, + message, + timestamp: new Date().toISOString(), + }); + }, + }, + ); + + initElicitationCallback(manager); + return manager; }, -); -// Middleware to inject client manager into context -app.use("*", async (c, next) => { - c.mcpClientManager = mcpClientManager; - await next(); + sessionStoreOptions: readSessionStoreOptionsFromEnv(), }); // ===== SECURITY MIDDLEWARE STACK ===== @@ -252,6 +280,14 @@ app.use("*", sessionAuthMiddleware); // ===== END SECURITY MIDDLEWARE ===== +// Resolve a manager only after the security stack has run. +app.use("*", async (c, next) => { + const sessionId = resolveRequestSessionId(c, HOSTED_MODE); + c.mcpSessionId = sessionId; + c.mcpClientManager = mcpClientManagerStore.getManager(sessionId); + await next(); +}); + // Middleware - only enable HTTP request logging in dev mode or when --verbose is passed const enableHttpLogs = process.env.NODE_ENV !== "production" || process.env.VERBOSE_LOGS === "true"; @@ -410,14 +446,16 @@ const server = serve({ // Handle graceful shutdown process.on("SIGINT", async () => { - console.log("\nšŸ›‘ Shutting down gracefully..."); + appLogger.info("\nšŸ›‘ Shutting down gracefully..."); + await mcpClientManagerStore.dispose(); await tunnelManager.closeAll(); server.close(); process.exit(0); }); process.on("SIGTERM", async () => { - console.log("\nšŸ›‘ Shutting down gracefully..."); + appLogger.info("\nšŸ›‘ Shutting down gracefully..."); + await mcpClientManagerStore.dispose(); await tunnelManager.closeAll(); server.close(); process.exit(0); diff --git a/mcpjam-inspector/server/middleware/__tests__/client-manager-session.test.ts b/mcpjam-inspector/server/middleware/__tests__/client-manager-session.test.ts new file mode 100644 index 000000000..6f64374d7 --- /dev/null +++ b/mcpjam-inspector/server/middleware/__tests__/client-manager-session.test.ts @@ -0,0 +1,68 @@ +import { describe, expect, it } from "vitest"; +import { Hono } from "hono"; +import { resolveRequestSessionId } from "../client-manager-session.js"; + +function createTestApp(hostedMode: boolean): Hono { + const app = new Hono(); + + app.use("*", async (c, next) => { + (c as any).resolvedSessionId = resolveRequestSessionId(c, hostedMode); + await next(); + }); + + app.get("/api/test", (c) => { + return c.json({ sessionId: (c as any).resolvedSessionId ?? null }); + }); + + return app; +} + +describe("resolveRequestSessionId", () => { + it("does not create a session when hosted mode is disabled", async () => { + const app = createTestApp(false); + + const res = await app.request("/api/test"); + const data = await res.json(); + + expect(data.sessionId).toBeNull(); + expect(res.headers.get("set-cookie")).toBeNull(); + }); + + it("uses explicit session header when present", async () => { + const app = createTestApp(true); + + const res = await app.request("/api/test", { + headers: { "x-mcpjam-session-id": "header-session-1" }, + }); + const data = await res.json(); + + expect(data.sessionId).toBe("header-session-1"); + expect(res.headers.get("set-cookie")).toBeNull(); + }); + + it("uses existing cookie session when present", async () => { + const app = createTestApp(true); + + const res = await app.request("/api/test", { + headers: { Cookie: "mcpjam_session_id=cookie-session-1" }, + }); + const data = await res.json(); + + expect(data.sessionId).toBe("cookie-session-1"); + expect(res.headers.get("set-cookie")).toBeNull(); + }); + + it("creates a new session cookie when no session identifier exists", async () => { + const app = createTestApp(true); + + const res = await app.request("/api/test"); + const data = await res.json(); + const setCookie = res.headers.get("set-cookie"); + + expect(data.sessionId).toMatch( + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i, + ); + expect(setCookie).toContain(`mcpjam_session_id=${data.sessionId}`); + expect(setCookie).toContain("HttpOnly"); + }); +}); diff --git a/mcpjam-inspector/server/middleware/client-manager-session.ts b/mcpjam-inspector/server/middleware/client-manager-session.ts new file mode 100644 index 000000000..ccb4565d8 --- /dev/null +++ b/mcpjam-inspector/server/middleware/client-manager-session.ts @@ -0,0 +1,73 @@ +import type { Context } from "hono"; + +export const MCPJAM_SESSION_COOKIE_NAME = "mcpjam_session_id"; +export const MCPJAM_SESSION_HEADER_NAME = "x-mcpjam-session-id"; + +const SESSION_ID_PATTERN = /^[A-Za-z0-9._:-]{1,128}$/; +const COOKIE_MAX_AGE_SECONDS = 60 * 60 * 24 * 30; + +export function resolveRequestSessionId( + c: Context, + hostedMode: boolean, +): string | undefined { + if (!hostedMode) return undefined; + + const headerSessionId = normalizeSessionId( + c.req.header(MCPJAM_SESSION_HEADER_NAME), + ); + if (headerSessionId) return headerSessionId; + + const cookieSessionId = normalizeSessionId( + getCookieValue(c.req.header("cookie"), MCPJAM_SESSION_COOKIE_NAME), + ); + if (cookieSessionId) return cookieSessionId; + + const generatedSessionId = crypto.randomUUID(); + const secureSuffix = isSecureRequest(c) ? "; Secure" : ""; + const cookie = `${MCPJAM_SESSION_COOKIE_NAME}=${generatedSessionId}; Path=/; HttpOnly; SameSite=Lax; Max-Age=${COOKIE_MAX_AGE_SECONDS}${secureSuffix}`; + + c.header("Set-Cookie", cookie, { append: true }); + return generatedSessionId; +} + +function normalizeSessionId(rawValue: string | undefined): string | undefined { + if (!rawValue) return undefined; + const trimmed = rawValue.trim(); + if (!trimmed || !SESSION_ID_PATTERN.test(trimmed)) return undefined; + return trimmed; +} + +function getCookieValue( + cookieHeader: string | undefined, + cookieName: string, +): string | undefined { + if (!cookieHeader) return undefined; + + const cookies = cookieHeader.split(";"); + for (const part of cookies) { + const [name, ...rest] = part.trim().split("="); + if (name !== cookieName) continue; + const rawValue = rest.join("="); + if (!rawValue) return undefined; + try { + return decodeURIComponent(rawValue); + } catch { + return rawValue; + } + } + + return undefined; +} + +function isSecureRequest(c: Context): boolean { + const forwardedProto = c.req.header("x-forwarded-proto"); + if (forwardedProto) { + return forwardedProto.toLowerCase().includes("https"); + } + + try { + return new URL(c.req.url).protocol === "https:"; + } catch { + return false; + } +} diff --git a/mcpjam-inspector/server/routes/mcp/servers.ts b/mcpjam-inspector/server/routes/mcp/servers.ts index e4fdf252a..d65de0acf 100644 --- a/mcpjam-inspector/server/routes/mcp/servers.ts +++ b/mcpjam-inspector/server/routes/mcp/servers.ts @@ -237,6 +237,7 @@ servers.post("/reconnect", async (c) => { // Stream JSON-RPC messages over SSE for all servers. servers.get("/rpc/stream", async (c) => { const serverIds = c.mcpClientManager.listServers(); + const sessionId = c.mcpSessionId; const url = new URL(c.req.url); const replay = parseInt(url.searchParams.get("replay") || "0", 10); @@ -254,7 +255,7 @@ servers.get("/rpc/stream", async (c) => { // Replay recent messages for all known servers try { const recent = rpcLogBus.getBuffer( - serverIds, + { serverIds, sessionId }, isNaN(replay) ? 0 : replay, ); for (const evt of recent) { @@ -263,9 +264,12 @@ servers.get("/rpc/stream", async (c) => { } catch {} // Subscribe to live events for all known servers - const unsubscribe = rpcLogBus.subscribe(serverIds, (evt: RpcLogEvent) => { - send({ type: "rpc", ...evt }); - }); + const unsubscribe = rpcLogBus.subscribe( + { serverIds, sessionId }, + (evt: RpcLogEvent) => { + send({ type: "rpc", ...evt }); + }, + ); // Keepalive comments const keepalive = setInterval(() => { diff --git a/mcpjam-inspector/server/routes/mcp/tasks.ts b/mcpjam-inspector/server/routes/mcp/tasks.ts index d53c3487b..b975b81b8 100644 --- a/mcpjam-inspector/server/routes/mcp/tasks.ts +++ b/mcpjam-inspector/server/routes/mcp/tasks.ts @@ -160,7 +160,7 @@ tasks.post("/progress", async (c) => { if (!serverId) return c.json({ error: "serverId is required" }, 400); - const progress = progressStore.getLatestProgress(serverId); + const progress = progressStore.getLatestProgress(serverId, c.mcpSessionId); return c.json({ progress: progress ?? null }); } catch (error) { console.error("Error getting progress:", error); @@ -180,7 +180,7 @@ tasks.post("/progress/all", async (c) => { if (!serverId) return c.json({ error: "serverId is required" }, 400); - const allProgress = progressStore.getAllProgress(serverId); + const allProgress = progressStore.getAllProgress(serverId, c.mcpSessionId); return c.json({ progress: allProgress }); } catch (error) { console.error("Error getting all progress:", error); diff --git a/mcpjam-inspector/server/services/__tests__/client-manager-store.test.ts b/mcpjam-inspector/server/services/__tests__/client-manager-store.test.ts new file mode 100644 index 000000000..05a945032 --- /dev/null +++ b/mcpjam-inspector/server/services/__tests__/client-manager-store.test.ts @@ -0,0 +1,117 @@ +import { describe, expect, it, vi } from "vitest"; +import type { MCPClientManager } from "@mcpjam/sdk"; +import { + SessionClientManagerStore, + SingletonClientManagerStore, +} from "../client-manager-store.js"; + +type FakeManager = { + listServers: ReturnType; + getClient: ReturnType; + disconnectServer: ReturnType; + removeServer: ReturnType; +}; + +function createFakeManager(serverIds: string[] = []): FakeManager { + return { + listServers: vi.fn().mockReturnValue(serverIds), + getClient: vi + .fn() + .mockImplementation((serverId: string) => + serverIds.includes(serverId) ? {} : null, + ), + disconnectServer: vi.fn().mockResolvedValue(undefined), + removeServer: vi.fn(), + }; +} + +describe("SingletonClientManagerStore", () => { + it("reuses the same manager for all session keys", () => { + const manager = createFakeManager() as unknown as MCPClientManager; + const managerFactory = vi.fn().mockReturnValue(manager); + const store = new SingletonClientManagerStore(managerFactory); + + const first = store.getManager("session-a"); + const second = store.getManager("session-b"); + + expect(first).toBe(second); + expect(first).toBe(manager); + expect(managerFactory).toHaveBeenCalledTimes(1); + }); +}); + +describe("SessionClientManagerStore", () => { + it("creates isolated managers per session key", () => { + const managerFactory = vi + .fn() + .mockReturnValueOnce(createFakeManager() as unknown as MCPClientManager) + .mockReturnValueOnce(createFakeManager() as unknown as MCPClientManager); + const store = new SessionClientManagerStore(managerFactory, { + ttlMs: 60_000, + sweepIntervalMs: 1, + maxEntries: 10, + now: () => 1_000, + }); + + const firstA = store.getManager("session-a"); + const secondA = store.getManager("session-a"); + const firstB = store.getManager("session-b"); + + expect(firstA).toBe(secondA); + expect(firstA).not.toBe(firstB); + expect(managerFactory).toHaveBeenCalledTimes(2); + }); + + it("evicts expired session managers and disconnects servers", async () => { + const expiredManager = createFakeManager(["s1", "s2"]); + const activeManager = createFakeManager(); + let now = 0; + + const managerFactory = vi + .fn() + .mockReturnValueOnce(expiredManager as unknown as MCPClientManager) + .mockReturnValueOnce(activeManager as unknown as MCPClientManager); + const store = new SessionClientManagerStore(managerFactory, { + ttlMs: 1_000, + sweepIntervalMs: 0, + maxEntries: 10, + now: () => now, + }); + + store.getManager("old"); + now = 2_000; + store.getManager("new"); + await Promise.resolve(); + + expect(expiredManager.disconnectServer).toHaveBeenCalledWith("s1"); + expect(expiredManager.disconnectServer).toHaveBeenCalledWith("s2"); + expect(expiredManager.removeServer).toHaveBeenCalledWith("s1"); + expect(expiredManager.removeServer).toHaveBeenCalledWith("s2"); + }); + + it("evicts least-recently-used session when capacity is reached", async () => { + const managerA = createFakeManager(["server-a"]); + const managerB = createFakeManager(); + let now = 0; + + const managerFactory = vi + .fn() + .mockReturnValueOnce(managerA as unknown as MCPClientManager) + .mockReturnValueOnce(managerB as unknown as MCPClientManager); + const store = new SessionClientManagerStore(managerFactory, { + ttlMs: 60_000, + sweepIntervalMs: 60_000, + maxEntries: 1, + now: () => now, + }); + + store.getManager("session-a"); + now = 1_000; + store.getManager("session-b"); + await Promise.resolve(); + + expect(managerA.disconnectServer).toHaveBeenCalledWith("server-a"); + expect(managerA.removeServer).toHaveBeenCalledWith("server-a"); + expect(managerFactory).toHaveBeenCalledTimes(2); + }); +}); diff --git a/mcpjam-inspector/server/services/__tests__/progress-store.test.ts b/mcpjam-inspector/server/services/__tests__/progress-store.test.ts new file mode 100644 index 000000000..a8ce9b979 --- /dev/null +++ b/mcpjam-inspector/server/services/__tests__/progress-store.test.ts @@ -0,0 +1,54 @@ +import { describe, expect, it, vi } from "vitest"; +import { ProgressStore } from "../progress-store.js"; + +describe("ProgressStore", () => { + it("isolates progress entries by session and server", () => { + const store = new ProgressStore(); + store.stopCleanupInterval(); + + store.publish({ + sessionId: "session-a", + serverId: "server-1", + progressToken: "token-a", + progress: 10, + timestamp: "2026-01-01T00:00:00.000Z", + }); + store.publish({ + sessionId: "session-b", + serverId: "server-1", + progressToken: "token-b", + progress: 90, + timestamp: "2026-01-01T00:00:01.000Z", + }); + + const sessionA = store.getAllProgress("server-1", "session-a"); + const sessionB = store.getAllProgress("server-1", "session-b"); + + expect(sessionA).toHaveLength(1); + expect(sessionA[0].progressToken).toBe("token-a"); + expect(sessionB).toHaveLength(1); + expect(sessionB[0].progressToken).toBe("token-b"); + }); + + it("does not emit subscribed progress when server filter is empty", () => { + const store = new ProgressStore(); + store.stopCleanupInterval(); + + const listener = vi.fn(); + const unsubscribe = store.subscribe( + { serverIds: [], sessionId: "session-a" }, + listener, + ); + + store.publish({ + sessionId: "session-a", + serverId: "server-1", + progressToken: "token-a", + progress: 50, + timestamp: "2026-01-01T00:00:00.000Z", + }); + unsubscribe(); + + expect(listener).not.toHaveBeenCalled(); + }); +}); diff --git a/mcpjam-inspector/server/services/__tests__/rpc-log-bus.test.ts b/mcpjam-inspector/server/services/__tests__/rpc-log-bus.test.ts new file mode 100644 index 000000000..c9235caf8 --- /dev/null +++ b/mcpjam-inspector/server/services/__tests__/rpc-log-bus.test.ts @@ -0,0 +1,59 @@ +import { describe, expect, it, vi } from "vitest"; +import { RpcLogBus } from "../rpc-log-bus.js"; + +describe("RpcLogBus", () => { + it("filters buffered events by session and server", () => { + const bus = new RpcLogBus(); + + bus.publish({ + sessionId: "session-a", + serverId: "server-1", + direction: "send", + timestamp: "2026-01-01T00:00:00.000Z", + message: { method: "a" }, + }); + bus.publish({ + sessionId: "session-b", + serverId: "server-1", + direction: "receive", + timestamp: "2026-01-01T00:00:01.000Z", + message: { method: "b" }, + }); + bus.publish({ + sessionId: "session-a", + serverId: "server-2", + direction: "receive", + timestamp: "2026-01-01T00:00:02.000Z", + message: { method: "c" }, + }); + + const events = bus.getBuffer( + { serverIds: ["server-1"], sessionId: "session-a" }, + 10, + ); + + expect(events).toHaveLength(1); + expect(events[0].sessionId).toBe("session-a"); + expect(events[0].serverId).toBe("server-1"); + }); + + it("does not emit events when server filter is empty", () => { + const bus = new RpcLogBus(); + const listener = vi.fn(); + + const unsubscribe = bus.subscribe( + { serverIds: [], sessionId: "session-a" }, + listener, + ); + bus.publish({ + sessionId: "session-a", + serverId: "server-1", + direction: "send", + timestamp: "2026-01-01T00:00:00.000Z", + message: { method: "a" }, + }); + unsubscribe(); + + expect(listener).not.toHaveBeenCalled(); + }); +}); diff --git a/mcpjam-inspector/server/services/client-manager-store.ts b/mcpjam-inspector/server/services/client-manager-store.ts new file mode 100644 index 000000000..7ea73aef5 --- /dev/null +++ b/mcpjam-inspector/server/services/client-manager-store.ts @@ -0,0 +1,212 @@ +import type { MCPClientManager } from "@mcpjam/sdk"; +import { logger as appLogger } from "../utils/logger.js"; + +const DEFAULT_SESSION_KEY = "default"; +const DEFAULT_TTL_MS = 30 * 60 * 1000; +const DEFAULT_SWEEP_INTERVAL_MS = 60 * 1000; +const DEFAULT_MAX_ENTRIES = 1000; + +type ManagerFactory = (sessionKey?: string) => MCPClientManager; + +type SessionEntry = { + manager: MCPClientManager; + lastAccessedAt: number; +}; + +export interface ClientManagerStore { + getManager(sessionKey?: string): MCPClientManager; + dispose(): Promise; +} + +export interface SessionClientManagerStoreOptions { + ttlMs?: number; + sweepIntervalMs?: number; + maxEntries?: number; + now?: () => number; +} + +export interface CreateClientManagerStoreOptions { + hostedMode: boolean; + managerFactory: ManagerFactory; + sessionStoreOptions?: SessionClientManagerStoreOptions; +} + +export function readSessionStoreOptionsFromEnv( + env: NodeJS.ProcessEnv = process.env, +): SessionClientManagerStoreOptions { + return { + ttlMs: readPositiveInteger( + env.MCPJAM_MANAGER_SESSION_TTL_MS, + DEFAULT_TTL_MS, + ), + sweepIntervalMs: readPositiveInteger( + env.MCPJAM_MANAGER_SWEEP_INTERVAL_MS, + DEFAULT_SWEEP_INTERVAL_MS, + ), + maxEntries: readPositiveInteger( + env.MCPJAM_MANAGER_MAX_SESSIONS, + DEFAULT_MAX_ENTRIES, + ), + }; +} + +export function createClientManagerStore({ + hostedMode, + managerFactory, + sessionStoreOptions, +}: CreateClientManagerStoreOptions): ClientManagerStore { + if (!hostedMode) { + return new SingletonClientManagerStore(managerFactory); + } + + return new SessionClientManagerStore(managerFactory, sessionStoreOptions); +} + +export class SingletonClientManagerStore implements ClientManagerStore { + private manager: MCPClientManager | null = null; + + constructor(private readonly managerFactory: ManagerFactory) {} + + getManager(_sessionKey?: string): MCPClientManager { + if (!this.manager) { + this.manager = this.managerFactory(_sessionKey); + } + return this.manager; + } + + async dispose(): Promise { + const manager = this.manager; + this.manager = null; + if (manager) { + await disconnectManager(manager); + } + } +} + +export class SessionClientManagerStore implements ClientManagerStore { + private readonly entries = new Map(); + private readonly ttlMs: number; + private readonly sweepIntervalMs: number; + private readonly maxEntries: number; + private readonly now: () => number; + private lastSweepAt = 0; + + constructor( + private readonly managerFactory: ManagerFactory, + options: SessionClientManagerStoreOptions = {}, + ) { + this.ttlMs = options.ttlMs ?? DEFAULT_TTL_MS; + this.sweepIntervalMs = options.sweepIntervalMs ?? DEFAULT_SWEEP_INTERVAL_MS; + this.maxEntries = options.maxEntries ?? DEFAULT_MAX_ENTRIES; + this.now = options.now ?? (() => Date.now()); + } + + getManager(sessionKey?: string): MCPClientManager { + const key = normalizeSessionKey(sessionKey); + const now = this.now(); + this.maybeSweep(now); + + const existing = this.entries.get(key); + if (existing) { + existing.lastAccessedAt = now; + return existing.manager; + } + + this.evictIfAtCapacity(); + + const manager = this.managerFactory(key); + this.entries.set(key, { manager, lastAccessedAt: now }); + return manager; + } + + async dispose(): Promise { + const managers = Array.from(this.entries.values()).map((e) => e.manager); + this.entries.clear(); + await Promise.all(managers.map((manager) => disconnectManager(manager))); + } + + private maybeSweep(now: number): void { + if (now - this.lastSweepAt < this.sweepIntervalMs) return; + this.lastSweepAt = now; + + for (const [key, entry] of this.entries.entries()) { + if (now - entry.lastAccessedAt >= this.ttlMs) { + this.evictSession(key, "ttl_expired"); + } + } + } + + private evictIfAtCapacity(): void { + if (this.entries.size < this.maxEntries) return; + + let oldestKey: string | null = null; + let oldestAccessedAt = Number.POSITIVE_INFINITY; + + for (const [key, entry] of this.entries.entries()) { + if (entry.lastAccessedAt < oldestAccessedAt) { + oldestAccessedAt = entry.lastAccessedAt; + oldestKey = key; + } + } + + if (oldestKey) { + this.evictSession(oldestKey, "capacity"); + } + } + + private evictSession(sessionKey: string, reason: "ttl_expired" | "capacity") { + const entry = this.entries.get(sessionKey); + if (!entry) return; + + this.entries.delete(sessionKey); + appLogger.debug("[client-manager-store] evict session manager", { + sessionKey, + reason, + }); + void disconnectManager(entry.manager); + } +} + +async function disconnectManager(manager: MCPClientManager): Promise { + const serverIds = manager.listServers(); + + await Promise.all( + serverIds.map(async (serverId) => { + try { + const client = manager.getClient(serverId); + if (client) { + await manager.disconnectServer(serverId); + } + } catch (error) { + appLogger.warn("[client-manager-store] failed to disconnect server", { + serverId, + error, + }); + } + + try { + manager.removeServer(serverId); + } catch (error) { + appLogger.warn("[client-manager-store] failed to remove server", { + serverId, + error, + }); + } + }), + ); +} + +function readPositiveInteger( + rawValue: string | undefined, + fallback: number, +): number { + if (!rawValue) return fallback; + const parsed = Number.parseInt(rawValue, 10); + if (!Number.isFinite(parsed) || parsed <= 0) return fallback; + return parsed; +} + +function normalizeSessionKey(sessionKey?: string): string { + if (!sessionKey) return DEFAULT_SESSION_KEY; + return sessionKey; +} diff --git a/mcpjam-inspector/server/services/progress-store.ts b/mcpjam-inspector/server/services/progress-store.ts index d9654d53f..25e45cb79 100644 --- a/mcpjam-inspector/server/services/progress-store.ts +++ b/mcpjam-inspector/server/services/progress-store.ts @@ -1,6 +1,7 @@ import { EventEmitter } from "events"; export type ProgressEvent = { + sessionId?: string; serverId: string; progressToken: string | number; progress: number; @@ -9,6 +10,11 @@ export type ProgressEvent = { timestamp: string; }; +export type ProgressFilter = { + serverIds: string[]; + sessionId?: string; +}; + // Stale progress entries are cleaned up after this duration (5 minutes) const PROGRESS_STALE_THRESHOLD_MS = 5 * 60 * 1000; // Run cleanup every 60 seconds @@ -20,9 +26,9 @@ const CLEANUP_INTERVAL_MS = 60 * 1000; * Also emits events for real-time streaming to clients. * Automatically cleans up stale progress entries to prevent memory leaks. */ -class ProgressStore { +export class ProgressStore { private readonly emitter = new EventEmitter(); - // Map: serverId -> Map + // Map: sessionKey+serverId -> Map private readonly store = new Map< string, Map @@ -38,10 +44,11 @@ class ProgressStore { * Store a progress update */ publish(event: ProgressEvent): void { - let serverProgress = this.store.get(event.serverId); + const scopedServerKey = getScopedServerKey(event.serverId, event.sessionId); + let serverProgress = this.store.get(scopedServerKey); if (!serverProgress) { serverProgress = new Map(); - this.store.set(event.serverId, serverProgress); + this.store.set(scopedServerKey, serverProgress); } serverProgress.set(event.progressToken, event); this.emitter.emit("progress", event); @@ -94,15 +101,20 @@ class ProgressStore { getProgress( serverId: string, progressToken: string | number, + sessionId?: string, ): ProgressEvent | undefined { - return this.store.get(serverId)?.get(progressToken); + return this.store + .get(getScopedServerKey(serverId, sessionId)) + ?.get(progressToken); } /** * Get all active progress for a server */ - getAllProgress(serverId: string): ProgressEvent[] { - const serverProgress = this.store.get(serverId); + getAllProgress(serverId: string, sessionId?: string): ProgressEvent[] { + const serverProgress = this.store.get( + getScopedServerKey(serverId, sessionId), + ); if (!serverProgress) return []; return Array.from(serverProgress.values()); } @@ -110,8 +122,11 @@ class ProgressStore { /** * Get the most recent progress event for a server (useful when we don't know the token) */ - getLatestProgress(serverId: string): ProgressEvent | undefined { - const all = this.getAllProgress(serverId); + getLatestProgress( + serverId: string, + sessionId?: string, + ): ProgressEvent | undefined { + const all = this.getAllProgress(serverId, sessionId); if (all.length === 0) return undefined; // Return the most recent by timestamp return all.sort( @@ -123,33 +138,48 @@ class ProgressStore { /** * Clear progress for a specific token (e.g., when task completes) */ - clearProgress(serverId: string, progressToken: string | number): void { - this.store.get(serverId)?.delete(progressToken); + clearProgress( + serverId: string, + progressToken: string | number, + sessionId?: string, + ): void { + this.store + .get(getScopedServerKey(serverId, sessionId)) + ?.delete(progressToken); } /** * Clear all progress for a server */ - clearAllProgress(serverId: string): void { - this.store.delete(serverId); + clearAllProgress(serverId: string, sessionId?: string): void { + this.store.delete(getScopedServerKey(serverId, sessionId)); } /** * Subscribe to progress events */ subscribe( - serverIds: string[], + filter: ProgressFilter, listener: (event: ProgressEvent) => void, ): () => void { - const filter = new Set(serverIds); + const serverFilter = new Set(filter.serverIds); + const hasServerFilter = serverFilter.size > 0; + const sessionFilter = filter.sessionId; const handler = (event: ProgressEvent) => { - if (filter.size === 0 || filter.has(event.serverId)) { - listener(event); - } + if (!hasServerFilter) return; + if (!serverFilter.has(event.serverId)) return; + if (sessionFilter !== undefined && event.sessionId !== sessionFilter) + return; + listener(event); }; this.emitter.on("progress", handler); return () => this.emitter.off("progress", handler); } } +function getScopedServerKey(serverId: string, sessionId?: string): string { + const sessionKey = sessionId ?? ""; + return `${sessionKey}::${serverId}`; +} + export const progressStore = new ProgressStore(); diff --git a/mcpjam-inspector/server/services/rpc-log-bus.ts b/mcpjam-inspector/server/services/rpc-log-bus.ts index 78ee34bd9..24897b928 100644 --- a/mcpjam-inspector/server/services/rpc-log-bus.ts +++ b/mcpjam-inspector/server/services/rpc-log-bus.ts @@ -1,13 +1,19 @@ import { EventEmitter } from "events"; export type RpcLogEvent = { + sessionId?: string; serverId: string; direction: "send" | "receive"; timestamp: string; // ISO message: unknown; }; -class RpcLogBus { +export type RpcLogFilter = { + serverIds: string[]; + sessionId?: string; +}; + +export class RpcLogBus { private readonly emitter = new EventEmitter(); private readonly bufferByServer = new Map(); @@ -19,23 +25,42 @@ class RpcLogBus { } subscribe( - serverIds: string[], + filter: RpcLogFilter, listener: (event: RpcLogEvent) => void, ): () => void { - const filter = new Set(serverIds); + const serverFilter = new Set(filter.serverIds); + const hasServerFilter = serverFilter.size > 0; + const sessionFilter = filter.sessionId; const handler = (event: RpcLogEvent) => { - if (filter.size === 0 || filter.has(event.serverId)) listener(event); + if (!hasServerFilter) return; + if (!serverFilter.has(event.serverId)) return; + if (sessionFilter !== undefined && event.sessionId !== sessionFilter) + return; + listener(event); }; this.emitter.on("event", handler); return () => this.emitter.off("event", handler); } - getBuffer(serverIds: string[], limit: number): RpcLogEvent[] { - const filter = new Set(serverIds); + getBuffer(filter: RpcLogFilter, limit: number): RpcLogEvent[] { + const serverFilter = new Set(filter.serverIds); + const hasServerFilter = serverFilter.size > 0; + if (!hasServerFilter) return []; + const all: RpcLogEvent[] = []; + const sessionFilter = filter.sessionId; for (const [serverId, buf] of this.bufferByServer.entries()) { - if (filter.size > 0 && !filter.has(serverId)) continue; - all.push(...buf); + if (!serverFilter.has(serverId)) continue; + if (sessionFilter === undefined) { + all.push(...buf); + continue; + } + + for (const event of buf) { + if (event.sessionId === sessionFilter) { + all.push(event); + } + } } // If limit is 0, return empty array (no replay) if (limit === 0) return []; diff --git a/mcpjam-inspector/server/types/hono.ts b/mcpjam-inspector/server/types/hono.ts index 3936859b5..ee0fd8e77 100644 --- a/mcpjam-inspector/server/types/hono.ts +++ b/mcpjam-inspector/server/types/hono.ts @@ -4,5 +4,6 @@ import type { MCPClientManager } from "@mcpjam/sdk"; declare module "hono" { interface Context { mcpClientManager: MCPClientManager; + mcpSessionId?: string; } }