Skip to content

Commit 552b4ac

Browse files
authored
feat(fix): proper shutdown logic k8s (#23)
Signed-off-by: Eitan Yarmush <eitan.yarmush@solo.io>
1 parent b574729 commit 552b4ac

File tree

1 file changed

+93
-23
lines changed

1 file changed

+93
-23
lines changed

mcp/src/index.ts

Lines changed: 93 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -265,13 +265,66 @@ server.tool(
265265
// --- Transport Setup ---
266266
async function main() {
267267
const transport_type = process.env.TRANSPORT_TYPE || 'http';
268+
let webserver: any = null; // Store server reference for proper shutdown
269+
270+
// Common graceful shutdown handler
271+
const createGracefulShutdownHandler = (transportCleanup: () => Promise<void>) => {
272+
return async (signal: string) => {
273+
console.error(`Received ${signal}, initiating graceful shutdown...`);
274+
275+
const shutdownTimeout = parseInt(process.env.SHUTDOWN_TIMEOUT || '5000', 10);
276+
const forceExitTimeout = setTimeout(() => {
277+
console.error(`Shutdown timeout (${shutdownTimeout}ms) exceeded, force exiting...`);
278+
process.exit(1);
279+
}, shutdownTimeout);
280+
281+
try {
282+
// Close HTTP server first to stop accepting new connections
283+
if (webserver) {
284+
await new Promise<void>((resolve, reject) => {
285+
webserver.close((err: any) => {
286+
if (err) {
287+
console.error('Error closing HTTP server:', err);
288+
reject(err);
289+
} else {
290+
console.error('HTTP server closed');
291+
resolve();
292+
}
293+
});
294+
});
295+
}
296+
297+
// Clean up transports
298+
await transportCleanup();
299+
300+
clearTimeout(forceExitTimeout);
301+
console.error('Graceful shutdown complete');
302+
process.exit(0);
303+
} catch (error) {
304+
console.error('Error during graceful shutdown:', error);
305+
clearTimeout(forceExitTimeout);
306+
process.exit(1);
307+
}
308+
};
309+
};
268310

269311
if (transport_type === 'stdio') {
270312
// Stdio transport for direct communication
271313
console.error("Starting MCP server with stdio transport...");
272314
const transport = new StdioServerTransport();
273315
await server.connect(transport);
274316
console.error("MCP server connected via stdio.");
317+
318+
// Add shutdown handler for stdio transport
319+
const shutdownHandler = createGracefulShutdownHandler(async () => {
320+
console.error('Closing stdio transport...');
321+
// StdioServerTransport doesn't have a close method, but we can clean up the connection
322+
// The transport will be cleaned up when the process exits
323+
});
324+
325+
process.on('SIGTERM', () => shutdownHandler('SIGTERM'));
326+
process.on('SIGINT', () => shutdownHandler('SIGINT'));
327+
275328
} else if (transport_type === 'sse') {
276329
// SSE transport for backward compatibility
277330
console.error("Starting MCP server with SSE transport...");
@@ -304,22 +357,26 @@ async function main() {
304357
}
305358
});
306359

360+
app.get("/health", (_: Request, res: Response) => {
361+
res.status(200).send("OK");
362+
});
363+
307364
const PORT = process.env.PORT || 3001;
308-
const webserver = app.listen(PORT, () => {
365+
webserver = app.listen(PORT, () => {
309366
console.error(`MCP server is running on port ${PORT} with SSE transport`);
310367
console.error(`Connect to: http://localhost:${PORT}/sse`);
311368
});
312369

313370
webserver.keepAliveTimeout = 3000;
314371

315372
// Keep the process alive
316-
webserver.on('error', (error) => {
373+
webserver.on('error', (error: any) => {
317374
console.error('HTTP server error:', error);
318375
});
319376

320-
// Handle server shutdown
321-
process.on('SIGINT', async () => {
322-
console.error('Shutting down SSE server...');
377+
// Handle server shutdown with proper SIGTERM/SIGINT support
378+
const shutdownHandler = createGracefulShutdownHandler(async () => {
379+
console.error('Closing SSE transports...');
323380

324381
// Close all active SSE transports
325382
for (const [sessionId, transport] of Object.entries(sseTransports)) {
@@ -331,13 +388,11 @@ async function main() {
331388
console.error(`Error cleaning up SSE transport for session ${sessionId}:`, error);
332389
}
333390
}
334-
335-
console.error('SSE server shutdown complete');
336-
process.exit(0);
337391
});
338392

339-
// Prevent the process from exiting
340-
process.stdin.resume();
393+
process.on('SIGTERM', () => shutdownHandler('SIGTERM'));
394+
process.on('SIGINT', () => shutdownHandler('SIGINT'));
395+
341396
} else if (transport_type === 'http') {
342397
// Streamable HTTP transport for web-based communication
343398
console.error("Starting MCP server with HTTP transport...");
@@ -476,39 +531,54 @@ async function main() {
476531
});
477532

478533
const PORT = process.env.PORT || 3001;
479-
const webserver = app.listen(PORT, () => {
534+
webserver = app.listen(PORT, () => {
480535
console.error(`MCP server is running on port ${PORT} with HTTP transport`);
481536
console.error(`Connect to: http://localhost:${PORT}/mcp`);
482537
});
483538

484539
webserver.keepAliveTimeout = 3000;
485540

486541
// Keep the process alive
487-
webserver.on('error', (error) => {
542+
webserver.on('error', (error: any) => {
488543
console.error('HTTP server error:', error);
489544
});
490545

491-
// Handle server shutdown
492-
process.on('SIGINT', async () => {
493-
console.error('Shutting down server...');
546+
// Handle server shutdown with proper SIGTERM/SIGINT support and timeout
547+
const shutdownHandler = createGracefulShutdownHandler(async () => {
548+
console.error('Closing HTTP transports...');
494549

495-
// Close all active transports to properly clean up resources
496-
for (const [sessionId, transport] of transports) {
550+
// Close all active transports with individual timeouts
551+
const transportClosePromises = Array.from(transports.entries()).map(async ([sessionId, transport]) => {
497552
try {
498553
console.error(`Closing transport for session ${sessionId}`);
499-
await transport.close();
554+
555+
// Add timeout to individual transport close operations
556+
const closeTimeout = new Promise<void>((_, reject) => {
557+
setTimeout(() => reject(new Error(`Transport close timeout for ${sessionId}`)), 2000);
558+
});
559+
560+
await Promise.race([
561+
transport.close(),
562+
closeTimeout
563+
]);
564+
500565
transports.delete(sessionId);
566+
console.error(`Transport closed for session ${sessionId}`);
501567
} catch (error) {
502568
console.error(`Error closing transport for session ${sessionId}:`, error);
569+
// Still remove from map even if close failed
570+
transports.delete(sessionId);
503571
}
504-
}
572+
});
505573

506-
console.error('Server shutdown complete');
507-
process.exit(0);
574+
// Wait for all transports to close, but with overall timeout handled by outer function
575+
await Promise.allSettled(transportClosePromises);
576+
console.error('All transports cleanup completed');
508577
});
509578

510-
// Prevent the process from exiting
511-
process.stdin.resume();
579+
process.on('SIGTERM', () => shutdownHandler('SIGTERM'));
580+
process.on('SIGINT', () => shutdownHandler('SIGINT'));
581+
512582
} else {
513583
console.error(`Unknown transport type: ${transport_type}. Use 'stdio', 'sse', or 'http'.`);
514584
process.exit(1);

0 commit comments

Comments
 (0)