From 26d04dde3013ca6a0bbeab3cb1c5538c3fb43553 Mon Sep 17 00:00:00 2001 From: gargameljyh <1954914724@qq.com> Date: Fri, 20 Jun 2025 02:44:54 -0700 Subject: [PATCH 1/5] feat(mcp-connector): add sse endpoint --- demo-server/src/connector.ts | 27 ++++++-- demo-server/src/index.ts | 10 +-- demo/src/mcp/index.js | 18 +++--- packages/mcp/mcp-connector/src/index.ts | 1 + packages/mcp/mcp-connector/src/sse/index.ts | 3 + .../src/sse/sse-client-endpoint.ts | 64 +++++++++++++++++++ .../src/sse/sse-endpoint-server.ts | 39 +++++++++++ .../src/sse/sse-server-endpoint.ts | 39 +++++++++++ 8 files changed, 183 insertions(+), 18 deletions(-) create mode 100644 packages/mcp/mcp-connector/src/sse/index.ts create mode 100644 packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts create mode 100644 packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts create mode 100644 packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts diff --git a/demo-server/src/connector.ts b/demo-server/src/connector.ts index 5d82a28..972db95 100644 --- a/demo-server/src/connector.ts +++ b/demo-server/src/connector.ts @@ -1,4 +1,10 @@ -import { WebSocketServerEndpoint, ConnectorCenter, WebSocketEndpointServer } from '@opentiny/tiny-agent-mcp-connector'; +import { + WebSocketServerEndpoint, + ConnectorCenter, + WebSocketEndpointServer, + SSEEndpointServer, + SSEServerEndpoint, +} from '@opentiny/tiny-agent-mcp-connector'; import type { Request } from 'express'; import type { WebSocket } from 'ws'; @@ -7,12 +13,23 @@ export function createConnector() { const webSocketEndpointServer = new WebSocketEndpointServer({ noServer: true }, connectorCenter); webSocketEndpointServer.start(); - const websocketConnectionHandler = async(ws: WebSocket, req: Request) => { + const websocketConnectionHandler = async (ws: WebSocket, req: Request) => { webSocketEndpointServer.wss.emit('connection', ws, req); - } + }; return { connectorCenter, webSocketEndpointServer, - websocketConnectionHandler - } + websocketConnectionHandler, + }; +} + +export function createSSEConnector() { + const connectorCenter = new ConnectorCenter(); + const sseEndpointServer = new SSEEndpointServer({ port: 8082 }, connectorCenter); + sseEndpointServer.start(); + + return { + connectorCenter, + sseEndpointServer, + }; } diff --git a/demo-server/src/index.ts b/demo-server/src/index.ts index c0008a5..562c0e0 100644 --- a/demo-server/src/index.ts +++ b/demo-server/src/index.ts @@ -2,7 +2,7 @@ import express, { type Request } from 'express'; import expressWs from 'express-ws'; import cors from 'cors'; import dotenv from 'dotenv'; -import { createConnector } from './connector'; +import { createConnector, createSSEConnector } from './connector'; import { createProxyServer } from './proxy-server'; import { createChat } from './chat'; @@ -30,13 +30,13 @@ const chatConfigFn = (req: Request) => ({ 'connector-client-id': req.headers['connector-client-id'], 'mcp-verify-code': req.headers['mcp-verify-code'], }, - timeout: 60 + timeout: 60, }, }, }, }); -const { connectorCenter, websocketConnectionHandler } = createConnector(); +const { connectorCenter } = createSSEConnector(); const { sseHandlers, streamableHttpHandlers } = createProxyServer({ connectorCenter }); const { chatHandler } = createChat(chatConfigFn); @@ -44,8 +44,8 @@ const app = express(); expressWs(app); app.use(cors()); -// connector -(app as unknown as expressWs.WithWebsocketMethod).ws('/ws', websocketConnectionHandler); +// // connector +// (app as unknown as expressWs.WithWebsocketMethod).ws('/ws', websocketConnectionHandler); // mcp server app.get('/mcp', streamableHttpHandlers.sessionHandler); diff --git a/demo/src/mcp/index.js b/demo/src/mcp/index.js index a53c273..45e2265 100644 --- a/demo/src/mcp/index.js +++ b/demo/src/mcp/index.js @@ -1,15 +1,16 @@ -import { EndpointTransport, WebSocketClientEndpoint } from '@opentiny/tiny-agent-mcp-connector'; +import { EndpointTransport, WebSocketClientEndpoint, SSEClientEndpoint } from '@opentiny/tiny-agent-mcp-connector'; import { McpValidator } from '@opentiny/tiny-agent-mcp-service'; import { setupMcpService } from '@opentiny/tiny-agent-mcp-service-vue'; import { McpToolParser } from '@opentiny/tiny-agent-task-mcp'; -import { useTaskScheduler } from "./scheduler"; +import { useTaskScheduler } from './scheduler'; import mcpToolJson from './mcp-tool.json'; export function initMcp() { // Connector + const sseEndpoint = new SSEClientEndpoint('http://localhost:8082/client'); const wsEndpoint = new WebSocketClientEndpoint({ url: import.meta.env.VITE_CONNECTOR_ENDPOINT_URL }); - const endpointTransport = new EndpointTransport(wsEndpoint); - + const endpointTransport = new EndpointTransport(sseEndpoint); + // MCP Service const mcpService = setupMcpService(); mcpService.mcpServer.connect(endpointTransport); @@ -19,9 +20,9 @@ export function initMcp() { mcpService.setValidator(mcpValidator); // Task Scheduler - const {taskScheduler, actionManager} = useTaskScheduler(); + const { taskScheduler, actionManager } = useTaskScheduler(); const doTask = async (task) => taskScheduler.pushTask(task); - + // MCP Tool Parser & mcp-tool.json const mcpToolParser = new McpToolParser(doTask); mcpToolParser.extractAllTools(mcpToolJson).forEach((tool) => { @@ -29,12 +30,13 @@ export function initMcp() { }); return { + sseEndpoint, wsEndpoint, endpointTransport, mcpService, mcpValidator, taskScheduler, actionManager, - mcpToolParser - } + mcpToolParser, + }; } diff --git a/packages/mcp/mcp-connector/src/index.ts b/packages/mcp/mcp-connector/src/index.ts index afaa28e..7907e27 100644 --- a/packages/mcp/mcp-connector/src/index.ts +++ b/packages/mcp/mcp-connector/src/index.ts @@ -2,3 +2,4 @@ export * from './endpoint.type'; export * from './endpoint-transport'; export * from './connector-center'; export * from './websocket'; +export * from './sse'; diff --git a/packages/mcp/mcp-connector/src/sse/index.ts b/packages/mcp/mcp-connector/src/sse/index.ts new file mode 100644 index 0000000..d555f42 --- /dev/null +++ b/packages/mcp/mcp-connector/src/sse/index.ts @@ -0,0 +1,3 @@ +export * from './sse-client-endpoint'; +export * from './sse-endpoint-server'; +export * from './sse-server-endpoint'; diff --git a/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts b/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts new file mode 100644 index 0000000..a5b5eae --- /dev/null +++ b/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts @@ -0,0 +1,64 @@ +import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types'; +import { EndpointMessageType, IConnectorEndpoint, IEndpointMessage } from '../endpoint.type'; + +export class SSEClientEndpoint implements IConnectorEndpoint { + public clientId!: string | number; + public clientIdResolved: Promise; + protected clientIdResolver!: (id: string | number) => void; + protected eventSource!: EventSource; + protected url: string; + protected config?: EventSourceInit; + + constructor(url: string, config?: EventSourceInit) { + this.url = url; + this.config = config; + this.clientIdResolved = new Promise((resolve) => { + this.clientIdResolver = resolve; + }); + } + start(): Promise { + return new Promise((resolve) => { + this.eventSource = new EventSource(this.url, this.config); + + this.eventSource.onopen = () => { + const message = { + type: EndpointMessageType.INITIALIZE, + }; + this.send(message); + }; + + this.eventSource.onerror = (error) => { + console.error('SSE error:', error); + this.onerror?.(error as any); + }; + + this.eventSource.onmessage = (messageEvent: MessageEvent) => { + const message: IEndpointMessage = JSON.parse(messageEvent.data); + if (message.type === EndpointMessageType.INITIALIZE) { + this.clientId = (message.data as any).clientId; + this.clientIdResolver(this.clientId); + resolve(); + return; + } + this.onmessage?.(message); + }; + }); + } + async close(): Promise { + this.eventSource.close(); + this.onclose?.(); + } + async send(message: IEndpointMessage): Promise { + if (message.type !== EndpointMessageType.INITIALIZE) { + await this.clientIdResolved; + } + + fetch(`${this.url}/message`, { + method: 'POST', + body: JSON.stringify(message), + }); + } + onmessage?: ((message: IEndpointMessage) => void) | null | undefined; + onclose?: (() => void) | null | undefined; + onerror?: ((error: Error) => void) | null | undefined; +} diff --git a/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts b/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts new file mode 100644 index 0000000..e52460e --- /dev/null +++ b/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts @@ -0,0 +1,39 @@ +import express, { type Request, type Response } from 'express'; +import { ConnectorCenter } from '../connector-center'; +import { EndpointMessageType } from '../endpoint.type'; +import { genId } from '../utils'; +import { SSEServerEndpoint } from './sse-server-endpoint'; + +export class SSEEndpointServer { + public app: express.Application; + protected connectorCenter: ConnectorCenter; + + constructor(config: { port: number }, connectorCenter: ConnectorCenter) { + this.app = express(); + this.connectorCenter = connectorCenter; + this.app.listen(config.port); + } + + start() { + this.app.all('/client', (req: Request, res: Response) => { + const clientId = genId(); + const endpoint = new SSEServerEndpoint(this.app, res, clientId); + + endpoint.start(); + + this.connectorCenter.setClient(clientId, endpoint); + const message = JSON.parse(req.body); + if (message.type === EndpointMessageType.INITIALIZE) { + res.set('Content-Type', 'text/event-stream'); + res.send( + JSON.stringify({ + type: EndpointMessageType.INITIALIZE, + data: { + clientId, + }, + }), + ); + } + }); + } +} diff --git a/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts b/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts new file mode 100644 index 0000000..d1b246e --- /dev/null +++ b/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts @@ -0,0 +1,39 @@ +import express, { type Request, type Response } from 'express'; +import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types'; +import { EndpointMessageType, IConnectorEndpoint, IEndpointMessage } from '../endpoint.type'; + +export class SSEServerEndpoint implements IConnectorEndpoint { + protected app: express.Application; + protected res: Response; + public clientId: string; + public clientIdResolved: Promise; + + constructor(app: express.Application, res: Response, clientId: string) { + this.app = app; + this.res = res; + this.clientId = clientId; + this.clientIdResolved = Promise.resolve(clientId); + } + + start(): Promise { + return new Promise(() => { + this.app.post('/message', (req: Request) => { + const message: IEndpointMessage = JSON.parse(req.body); + if (message.type === EndpointMessageType.INITIALIZE) { + return; + } + + this.onmessage?.(message); + }); + }); + } + async close(): Promise { + this.res.end(); + } + async send(message: IEndpointMessage): Promise { + this.res.write(`data: ${JSON.stringify(message)}\n`); + } + onmessage?: ((message: IEndpointMessage) => void) | null | undefined; + onclose?: (() => void) | null | undefined; + onerror?: ((error: Error) => void) | null | undefined; +} From e6a78ad50861f17b2a049301f62c844cd5960c32 Mon Sep 17 00:00:00 2001 From: gargameljyh <1954914724@qq.com> Date: Fri, 20 Jun 2025 04:18:26 -0700 Subject: [PATCH 2/5] feat(mcp-connector): use http connect replace express --- demo-server/src/index.ts | 1 + demo/src/mcp/index.js | 1 + .../src/sse/sse-client-endpoint.ts | 11 ++--- .../src/sse/sse-endpoint-server.ts | 42 +++++++++++-------- .../src/sse/sse-server-endpoint.ts | 38 ++++++++++++----- 5 files changed, 58 insertions(+), 35 deletions(-) diff --git a/demo-server/src/index.ts b/demo-server/src/index.ts index 562c0e0..a0cce07 100644 --- a/demo-server/src/index.ts +++ b/demo-server/src/index.ts @@ -36,6 +36,7 @@ const chatConfigFn = (req: Request) => ({ }, }); +// const { connectorCenter, websocketConnectionHandler } = createConnector(); const { connectorCenter } = createSSEConnector(); const { sseHandlers, streamableHttpHandlers } = createProxyServer({ connectorCenter }); const { chatHandler } = createChat(chatConfigFn); diff --git a/demo/src/mcp/index.js b/demo/src/mcp/index.js index 45e2265..e88306a 100644 --- a/demo/src/mcp/index.js +++ b/demo/src/mcp/index.js @@ -10,6 +10,7 @@ export function initMcp() { const sseEndpoint = new SSEClientEndpoint('http://localhost:8082/client'); const wsEndpoint = new WebSocketClientEndpoint({ url: import.meta.env.VITE_CONNECTOR_ENDPOINT_URL }); const endpointTransport = new EndpointTransport(sseEndpoint); + // const endpointTransport = new EndpointTransport(wsEndpoint); // MCP Service const mcpService = setupMcpService(); diff --git a/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts b/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts index a5b5eae..19a41d1 100644 --- a/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts +++ b/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts @@ -20,12 +20,7 @@ export class SSEClientEndpoint implements IConnectorEndpoint { return new Promise((resolve) => { this.eventSource = new EventSource(this.url, this.config); - this.eventSource.onopen = () => { - const message = { - type: EndpointMessageType.INITIALIZE, - }; - this.send(message); - }; + this.eventSource.onopen = () => {}; this.eventSource.onerror = (error) => { console.error('SSE error:', error); @@ -53,7 +48,9 @@ export class SSEClientEndpoint implements IConnectorEndpoint { await this.clientIdResolved; } - fetch(`${this.url}/message`, { + const url = URL.parse(this.url); + + fetch(`${url?.origin}/message`, { method: 'POST', body: JSON.stringify(message), }); diff --git a/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts b/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts index e52460e..890db63 100644 --- a/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts +++ b/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts @@ -1,37 +1,43 @@ -import express, { type Request, type Response } from 'express'; +import * as http from 'node:http'; import { ConnectorCenter } from '../connector-center'; import { EndpointMessageType } from '../endpoint.type'; import { genId } from '../utils'; import { SSEServerEndpoint } from './sse-server-endpoint'; export class SSEEndpointServer { - public app: express.Application; + public app: http.Server; protected connectorCenter: ConnectorCenter; constructor(config: { port: number }, connectorCenter: ConnectorCenter) { - this.app = express(); + this.app = http.createServer(); this.connectorCenter = connectorCenter; this.app.listen(config.port); } start() { - this.app.all('/client', (req: Request, res: Response) => { - const clientId = genId(); - const endpoint = new SSEServerEndpoint(this.app, res, clientId); + this.app.on('request', (req, res) => { + if (req.url === '/client') { + const clientId = genId(); + const endpoint = new SSEServerEndpoint(this.app, res, clientId); - endpoint.start(); + endpoint.start(); - this.connectorCenter.setClient(clientId, endpoint); - const message = JSON.parse(req.body); - if (message.type === EndpointMessageType.INITIALIZE) { - res.set('Content-Type', 'text/event-stream'); - res.send( - JSON.stringify({ - type: EndpointMessageType.INITIALIZE, - data: { - clientId, - }, - }), + this.connectorCenter.setClient(clientId, endpoint); + + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', '*'); + res.write( + 'data: ' + + JSON.stringify({ + type: EndpointMessageType.INITIALIZE, + data: { + clientId, + }, + }) + + '\n\n', ); } }); diff --git a/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts b/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts index d1b246e..b4298a3 100644 --- a/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts +++ b/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts @@ -1,14 +1,14 @@ -import express, { type Request, type Response } from 'express'; +import * as http from 'node:http'; import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types'; import { EndpointMessageType, IConnectorEndpoint, IEndpointMessage } from '../endpoint.type'; export class SSEServerEndpoint implements IConnectorEndpoint { - protected app: express.Application; - protected res: Response; + protected app: http.Server; + protected res: any; public clientId: string; public clientIdResolved: Promise; - constructor(app: express.Application, res: Response, clientId: string) { + constructor(app: http.Server, res: any, clientId: string) { this.app = app; this.res = res; this.clientId = clientId; @@ -17,13 +17,31 @@ export class SSEServerEndpoint implements IConnectorEndpoint { start(): Promise { return new Promise(() => { - this.app.post('/message', (req: Request) => { - const message: IEndpointMessage = JSON.parse(req.body); - if (message.type === EndpointMessageType.INITIALIZE) { - return; - } + this.app.on('request', (req, res) => { + if (req.url === '/message') { + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', '*'); + + let body = ''; + + req.on('data', (chunk) => { + body += chunk.toString(); + }); - this.onmessage?.(message); + req.on('end', () => { + try { + const message = JSON.parse(body); + if (message.type === EndpointMessageType.INITIALIZE) { + res.end(); + return; + } + + this.onmessage?.(message); + } finally { + res.end(); + } + }); + } }); }); } From 139ec1929221c7342df9e4a0ed24bdb8dc4fe1a4 Mon Sep 17 00:00:00 2001 From: gargameljyh <1954914724@qq.com> Date: Sun, 22 Jun 2025 19:56:53 -0700 Subject: [PATCH 3/5] docs(mcp-connector): add sse connector docs --- docs/.vitepress/config.mts | 2 +- docs/src/extensions/connector.md | 222 +++++++++++++++++++++++++++++++ docs/src/extensions/extension.md | 4 + 3 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 docs/src/extensions/connector.md diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 359e2c2..9cfa2e1 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -134,7 +134,7 @@ export default defineConfig({ { text: '连接器扩展', base: '/extensions/', - items: [{ text: '待补充', link: '' }], + items: [{ text: '自定义Connector', link: 'connector' }], }, ], }, diff --git a/docs/src/extensions/connector.md b/docs/src/extensions/connector.md new file mode 100644 index 0000000..e979887 --- /dev/null +++ b/docs/src/extensions/connector.md @@ -0,0 +1,222 @@ +# 自定义Connector + +本章节主要介绍如何实现一个自定义的通信层Connector,用以客户端与MCP服务端之间进行通信访问 + +## 自定义服务端 + +服务端通信需要以 TinyAgent 规定的 IConnectorEndpoint 协议为基础,当然客户端也是基于此协议进行拓展。 +IConnectorEndpoint 协议要求服务端关注消息的接收发送以及启动(初始化)服务端实例,以下是 IConnectorEndpoint 协议的内容 + +```typescript +interface IConnectorEndpoint { + clientId: string | number; + clientIdResolved: Promise; + + start(): Promise; + close(): Promise; + + send(message: IEndpointMessage): Promise; + onmessage?: ((message: IEndpointMessage) => void) | null; + + onclose?: (() => void) | null; + onerror?: ((error: Error) => void) | null; +} +``` + +接下来基于 IConnectorEndpoint 协议实现服务端,服务端主要关注于通信,通信方式使用SSE通信方法 + +```typescript +import * as http from 'node:http'; +import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types'; +import { EndpointMessageType, IConnectorEndpoint, IEndpointMessage } from '../endpoint.type'; + +class SSEServerEndpoint implements IConnectorEndpoint { + protected app: http.Server; + protected res: any; + public clientId: string; + public clientIdResolved: Promise; + + constructor(app: http.Server, res: any, clientId: string) { + this.app = app; + this.res = res; + this.clientId = clientId; + this.clientIdResolved = Promise.resolve(clientId); + } + + start(): Promise { + return new Promise(() => { + // 订阅http请求 + this.app.on('request', (req, res) => { + // 定义/message api 用以接收客户端的请求内容 + if (req.url === '/message') { + // 解决跨域问题 + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', '*'); + + // 读取请求体 + let body = ''; + req.on('data', (chunk) => { + body += chunk.toString(); + }); + + // 响应请求 + req.on('end', () => { + try { + const message = JSON.parse(body); + if (message.type === EndpointMessageType.INITIALIZE) { + res.end(); + return; + } + + this.onmessage?.(message); + } finally { + res.end(); + } + }); + } + }); + }); + } + // 关闭SSE连接 + async close(): Promise { + this.res.end(); + } + // 使用SSE连接向客户端推送消息 + async send(message: IEndpointMessage): Promise { + this.res.write(`data: ${JSON.stringify(message)}\n`); + } + onmessage?: ((message: IEndpointMessage) => void) | null | undefined; + onclose?: (() => void) | null | undefined; + onerror?: ((error: Error) => void) | null | undefined; +} +``` + +为使模块责任清晰,服务端的启动以及ClientId的初始化工作交给了Server模块,该模块并不负责通信,以下是Server模块的代码示例 + +```typescript +import * as http from 'node:http'; +import { ConnectorCenter } from '../connector-center'; +import { EndpointMessageType } from '../endpoint.type'; +import { genId } from '../utils'; +import { SSEServerEndpoint } from './sse-server-endpoint'; + +class SSEEndpointServer { + public app: http.Server; + protected connectorCenter: ConnectorCenter; + + constructor(config: { port: number }, connectorCenter: ConnectorCenter) { + // 启动http服务器 + this.app = http.createServer(); + this.connectorCenter = connectorCenter; + this.app.listen(config.port); + } + + start() { + this.app.on('request', (req, res) => { + // 定义/client api用以初始化ClientId + if (req.url === '/client') { + const clientId = genId(); + const endpoint = new SSEServerEndpoint(this.app, res, clientId); + + endpoint.start(); + + this.connectorCenter.setClient(clientId, endpoint); + + // SSE方式通信 + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + // 解决跨域问题 + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', '*'); + // 推送初始化的ClientId + res.write( + 'data: ' + + JSON.stringify({ + type: EndpointMessageType.INITIALIZE, + data: { + clientId, + }, + }) + + '\n\n', + ); + } + }); + } +} +``` + +## 自定义客户端 + +下面是SSE客户端的实现,同意基于IConnectorEndpoint 协议 + +```typescript +import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types'; +import { EndpointMessageType, IConnectorEndpoint, IEndpointMessage } from '../endpoint.type'; + +class SSEClientEndpoint implements IConnectorEndpoint { + public clientId!: string | number; + public clientIdResolved: Promise; + protected clientIdResolver!: (id: string | number) => void; + // SSE连接实例 + protected eventSource!: EventSource; + // SSE连接url + protected url: string; + // SSE连接配置项 + protected config?: EventSourceInit; + + constructor(url: string, config?: EventSourceInit) { + this.url = url; + this.config = config; + this.clientIdResolved = new Promise((resolve) => { + this.clientIdResolver = resolve; + }); + } + start(): Promise { + return new Promise((resolve) => { + // 创建SSE连接实例 + this.eventSource = new EventSource(this.url, this.config); + + this.eventSource.onopen = () => {}; + + this.eventSource.onerror = (error) => { + console.error('SSE error:', error); + this.onerror?.(error as any); + }; + + // 接收服务端消息 + this.eventSource.onmessage = (messageEvent: MessageEvent) => { + const message: IEndpointMessage = JSON.parse(messageEvent.data); + if (message.type === EndpointMessageType.INITIALIZE) { + this.clientId = (message.data as any).clientId; + this.clientIdResolver(this.clientId); + resolve(); + return; + } + this.onmessage?.(message); + }; + }); + } + async close(): Promise { + this.eventSource.close(); + this.onclose?.(); + } + // 由于SSE是用以服务端向客户端推送消息,后续客户端主动请求则是以HTTP进行 + async send(message: IEndpointMessage): Promise { + if (message.type !== EndpointMessageType.INITIALIZE) { + await this.clientIdResolved; + } + + const url = URL.parse(this.url); + + // 请求/message api 向服务端发送请求 + fetch(`${url?.origin}/message`, { + method: 'POST', + body: JSON.stringify(message), + }); + } + onmessage?: ((message: IEndpointMessage) => void) | null | undefined; + onclose?: (() => void) | null | undefined; + onerror?: ((error: Error) => void) | null | undefined; +} +``` diff --git a/docs/src/extensions/extension.md b/docs/src/extensions/extension.md index da9f74c..7bf51ee 100644 --- a/docs/src/extensions/extension.md +++ b/docs/src/extensions/extension.md @@ -11,3 +11,7 @@ ### 调度器扩展 - [自定义调度器界面UI](/extensions/custom-ui.md) + +### 连接器扩展 + +- [自定义连接器](/extensions/connector.md) From 55c766fa0bc8c614bbb2eb1deab7b025b6377ada Mon Sep 17 00:00:00 2001 From: gargameljyh <1954914724@qq.com> Date: Mon, 23 Jun 2025 23:56:10 -0700 Subject: [PATCH 4/5] docs(mcp-connect): fix coderabbit suggestion --- demo-server/src/connector.ts | 10 +- demo/.env.development | 3 +- demo/src/mcp/index.js | 4 +- docs/src/extensions/connector.md | 91 ++++++++++--------- .../src/sse/sse-client-endpoint.ts | 23 +++-- .../src/sse/sse-endpoint-server.ts | 4 +- .../src/sse/sse-server-endpoint.ts | 48 +++++----- 7 files changed, 103 insertions(+), 80 deletions(-) diff --git a/demo-server/src/connector.ts b/demo-server/src/connector.ts index 972db95..915202f 100644 --- a/demo-server/src/connector.ts +++ b/demo-server/src/connector.ts @@ -25,9 +25,17 @@ export function createConnector() { export function createSSEConnector() { const connectorCenter = new ConnectorCenter(); - const sseEndpointServer = new SSEEndpointServer({ port: 8082 }, connectorCenter); + const port = 8082; + const sseEndpointServer = new SSEEndpointServer({ port }, connectorCenter); sseEndpointServer.start(); + try { + sseEndpointServer.start(); + } catch (error) { + console.error(`Failed to start SSE endpoint server on port ${port}:`, error); + throw error; + } + return { connectorCenter, sseEndpointServer, diff --git a/demo/.env.development b/demo/.env.development index 5de55c5..e612661 100644 --- a/demo/.env.development +++ b/demo/.env.development @@ -1,2 +1,3 @@ -VITE_CONNECTOR_ENDPOINT_URL=ws://localhost:3001/ws +VITE_CONNECTOR_WS_ENDPOINT_URL=ws://localhost:3001/ws +VITE_CONNECTOR_SSE_ENDPOINT_URL=http://localhost:8082/client VITE_CHAT_URL=http://localhost:3001/chat diff --git a/demo/src/mcp/index.js b/demo/src/mcp/index.js index e88306a..a6bd841 100644 --- a/demo/src/mcp/index.js +++ b/demo/src/mcp/index.js @@ -7,8 +7,8 @@ import mcpToolJson from './mcp-tool.json'; export function initMcp() { // Connector - const sseEndpoint = new SSEClientEndpoint('http://localhost:8082/client'); - const wsEndpoint = new WebSocketClientEndpoint({ url: import.meta.env.VITE_CONNECTOR_ENDPOINT_URL }); + const sseEndpoint = new SSEClientEndpoint(import.meta.env.VITE_CONNECTOR_SSE_ENDPOINT_URL); + const wsEndpoint = new WebSocketClientEndpoint({ url: import.meta.env.VITE_CONNECTOR_WS_ENDPOINT_URL }); const endpointTransport = new EndpointTransport(sseEndpoint); // const endpointTransport = new EndpointTransport(wsEndpoint); diff --git a/docs/src/extensions/connector.md b/docs/src/extensions/connector.md index e979887..441455a 100644 --- a/docs/src/extensions/connector.md +++ b/docs/src/extensions/connector.md @@ -32,7 +32,7 @@ import { EndpointMessageType, IConnectorEndpoint, IEndpointMessage } from '../en class SSEServerEndpoint implements IConnectorEndpoint { protected app: http.Server; - protected res: any; + protected res: http.ServerResponse; public clientId: string; public clientIdResolved: Promise; @@ -43,38 +43,36 @@ class SSEServerEndpoint implements IConnectorEndpoint { this.clientIdResolved = Promise.resolve(clientId); } - start(): Promise { - return new Promise(() => { - // 订阅http请求 - this.app.on('request', (req, res) => { - // 定义/message api 用以接收客户端的请求内容 - if (req.url === '/message') { - // 解决跨域问题 - res.setHeader('Access-Control-Allow-Origin', '*'); - res.setHeader('Access-Control-Allow-Methods', '*'); - - // 读取请求体 - let body = ''; - req.on('data', (chunk) => { - body += chunk.toString(); - }); - - // 响应请求 - req.on('end', () => { - try { - const message = JSON.parse(body); - if (message.type === EndpointMessageType.INITIALIZE) { - res.end(); - return; - } - - this.onmessage?.(message); - } finally { + async start(): Promise { + // 订阅http请求 + this.app.on('request', (req, res) => { + // 定义/message api 用以接收客户端的请求内容 + if (req.url === '/message') { + // 解决跨域问题 + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', '*'); + + // 读取请求体 + let body = ''; + req.on('data', (chunk) => { + body += chunk.toString(); + }); + + // 响应请求 + req.on('end', () => { + try { + const message = JSON.parse(body); + if (message.type === EndpointMessageType.INITIALIZE) { res.end(); + return; } - }); - } - }); + + this.onmessage?.(message); + } finally { + res.end(); + } + }); + } }); } // 关闭SSE连接 @@ -83,7 +81,7 @@ class SSEServerEndpoint implements IConnectorEndpoint { } // 使用SSE连接向客户端推送消息 async send(message: IEndpointMessage): Promise { - this.res.write(`data: ${JSON.stringify(message)}\n`); + this.res.write(`data: ${JSON.stringify(message)}\n\n`); } onmessage?: ((message: IEndpointMessage) => void) | null | undefined; onclose?: (() => void) | null | undefined; @@ -102,16 +100,18 @@ import { SSEServerEndpoint } from './sse-server-endpoint'; class SSEEndpointServer { public app: http.Server; + protected port: number; protected connectorCenter: ConnectorCenter; constructor(config: { port: number }, connectorCenter: ConnectorCenter) { // 启动http服务器 this.app = http.createServer(); + this.port = config.port; this.connectorCenter = connectorCenter; - this.app.listen(config.port); } start() { + this.app.listen(config.port); this.app.on('request', (req, res) => { // 定义/client api用以初始化ClientId if (req.url === '/client') { @@ -203,17 +203,24 @@ class SSEClientEndpoint implements IConnectorEndpoint { } // 由于SSE是用以服务端向客户端推送消息,后续客户端主动请求则是以HTTP进行 async send(message: IEndpointMessage): Promise { - if (message.type !== EndpointMessageType.INITIALIZE) { - await this.clientIdResolved; - } + try { + if (message.type !== EndpointMessageType.INITIALIZE) { + await this.clientIdResolved; + } - const url = URL.parse(this.url); + const url = URL.parse(this.url); - // 请求/message api 向服务端发送请求 - fetch(`${url?.origin}/message`, { - method: 'POST', - body: JSON.stringify(message), - }); + // 请求/message api 向服务端发送请求 + fetch(`${url?.origin}/message`, { + method: 'POST', + body: JSON.stringify(message), + headers: { + 'Content-Type': 'application/json', + }, + }); + } catch (e) { + this.onerror?.(e as Error); + } } onmessage?: ((message: IEndpointMessage) => void) | null | undefined; onclose?: (() => void) | null | undefined; diff --git a/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts b/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts index 19a41d1..4621c95 100644 --- a/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts +++ b/packages/mcp/mcp-connector/src/sse/sse-client-endpoint.ts @@ -44,16 +44,23 @@ export class SSEClientEndpoint implements IConnectorEndpoint { this.onclose?.(); } async send(message: IEndpointMessage): Promise { - if (message.type !== EndpointMessageType.INITIALIZE) { - await this.clientIdResolved; - } + try { + if (message.type !== EndpointMessageType.INITIALIZE) { + await this.clientIdResolved; + } - const url = URL.parse(this.url); + const url = URL.parse(this.url); - fetch(`${url?.origin}/message`, { - method: 'POST', - body: JSON.stringify(message), - }); + fetch(`${url?.origin}/message`, { + method: 'POST', + body: JSON.stringify(message), + headers: { + 'Content-Type': 'application/json', + }, + }); + } catch (e) { + this.onerror?.(e as Error); + } } onmessage?: ((message: IEndpointMessage) => void) | null | undefined; onclose?: (() => void) | null | undefined; diff --git a/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts b/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts index 890db63..89a6294 100644 --- a/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts +++ b/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts @@ -6,15 +6,17 @@ import { SSEServerEndpoint } from './sse-server-endpoint'; export class SSEEndpointServer { public app: http.Server; + protected port: number; protected connectorCenter: ConnectorCenter; constructor(config: { port: number }, connectorCenter: ConnectorCenter) { this.app = http.createServer(); + this.port = config.port; this.connectorCenter = connectorCenter; - this.app.listen(config.port); } start() { + this.app.listen(this.port); this.app.on('request', (req, res) => { if (req.url === '/client') { const clientId = genId(); diff --git a/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts b/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts index b4298a3..108b19f 100644 --- a/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts +++ b/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts @@ -4,7 +4,7 @@ import { EndpointMessageType, IConnectorEndpoint, IEndpointMessage } from '../en export class SSEServerEndpoint implements IConnectorEndpoint { protected app: http.Server; - protected res: any; + protected res: http.ServerResponse; public clientId: string; public clientIdResolved: Promise; @@ -15,41 +15,39 @@ export class SSEServerEndpoint implements IConnectorEndpoint { this.clientIdResolved = Promise.resolve(clientId); } - start(): Promise { - return new Promise(() => { - this.app.on('request', (req, res) => { - if (req.url === '/message') { - res.setHeader('Access-Control-Allow-Origin', '*'); - res.setHeader('Access-Control-Allow-Methods', '*'); + async start(): Promise { + this.app.on('request', (req, res) => { + if (req.url === '/message') { + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', '*'); - let body = ''; + let body = ''; - req.on('data', (chunk) => { - body += chunk.toString(); - }); + req.on('data', (chunk) => { + body += chunk.toString(); + }); - req.on('end', () => { - try { - const message = JSON.parse(body); - if (message.type === EndpointMessageType.INITIALIZE) { - res.end(); - return; - } - - this.onmessage?.(message); - } finally { + req.on('end', () => { + try { + const message = JSON.parse(body); + if (message.type === EndpointMessageType.INITIALIZE) { res.end(); + return; } - }); - } - }); + + this.onmessage?.(message); + } finally { + res.end(); + } + }); + } }); } async close(): Promise { this.res.end(); } async send(message: IEndpointMessage): Promise { - this.res.write(`data: ${JSON.stringify(message)}\n`); + this.res.write(`data: ${JSON.stringify(message)}\n\n`); } onmessage?: ((message: IEndpointMessage) => void) | null | undefined; onclose?: (() => void) | null | undefined; From a0247564f300f2753e5cd34c556bd79aaa9ea033 Mon Sep 17 00:00:00 2001 From: gargameljyh <1954914724@qq.com> Date: Tue, 24 Jun 2025 00:06:28 -0700 Subject: [PATCH 5/5] docs(mcp-connect): options request handle request headers --- docs/src/extensions/connector.md | 16 ++++++++++++++++ .../mcp-connector/src/sse/sse-endpoint-server.ts | 8 ++++++++ .../mcp-connector/src/sse/sse-server-endpoint.ts | 8 ++++++++ 3 files changed, 32 insertions(+) diff --git a/docs/src/extensions/connector.md b/docs/src/extensions/connector.md index 441455a..0582ade 100644 --- a/docs/src/extensions/connector.md +++ b/docs/src/extensions/connector.md @@ -46,6 +46,14 @@ class SSEServerEndpoint implements IConnectorEndpoint { async start(): Promise { // 订阅http请求 this.app.on('request', (req, res) => { + if (req.method === 'OPTIONS' && req.url === '/message') { + res.writeHead(204, { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': '*', + 'Access-Control-Allow-Headers': '*', + }); + return res.end(); + } // 定义/message api 用以接收客户端的请求内容 if (req.url === '/message') { // 解决跨域问题 @@ -113,6 +121,14 @@ class SSEEndpointServer { start() { this.app.listen(config.port); this.app.on('request', (req, res) => { + if (req.method === 'OPTIONS' && req.url === '/client') { + res.writeHead(204, { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': '*', + 'Access-Control-Allow-Headers': '*', + }); + return res.end(); + } // 定义/client api用以初始化ClientId if (req.url === '/client') { const clientId = genId(); diff --git a/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts b/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts index 89a6294..0299431 100644 --- a/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts +++ b/packages/mcp/mcp-connector/src/sse/sse-endpoint-server.ts @@ -18,6 +18,14 @@ export class SSEEndpointServer { start() { this.app.listen(this.port); this.app.on('request', (req, res) => { + if (req.method === 'OPTIONS' && req.url === '/client') { + res.writeHead(204, { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': '*', + 'Access-Control-Allow-Headers': '*', + }); + return res.end(); + } if (req.url === '/client') { const clientId = genId(); const endpoint = new SSEServerEndpoint(this.app, res, clientId); diff --git a/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts b/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts index 108b19f..4d4bc6e 100644 --- a/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts +++ b/packages/mcp/mcp-connector/src/sse/sse-server-endpoint.ts @@ -17,6 +17,14 @@ export class SSEServerEndpoint implements IConnectorEndpoint { async start(): Promise { this.app.on('request', (req, res) => { + if (req.method === 'OPTIONS' && req.url === '/message') { + res.writeHead(204, { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': '*', + 'Access-Control-Allow-Headers': '*', + }); + return res.end(); + } if (req.url === '/message') { res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', '*');