From c81bbf1c80ef4b007b144c80c3438be5aa1c2a1e Mon Sep 17 00:00:00 2001 From: KevinChenCodexMini Date: Mon, 9 Mar 2026 19:16:43 +0800 Subject: [PATCH 1/9] Add Happy Codex resume workflow --- packages/happy-cli/src/api/api.ts | 62 +- .../src/codex/codexAppServerClient.ts | 710 ++++++++++++++++++ .../happy-cli/src/codex/codexMcpClient.ts | 16 +- packages/happy-cli/src/codex/runCodex.ts | 378 +++++++++- packages/happy-cli/src/index.ts | 291 +++++++ 5 files changed, 1429 insertions(+), 28 deletions(-) create mode 100644 packages/happy-cli/src/codex/codexAppServerClient.ts diff --git a/packages/happy-cli/src/api/api.ts b/packages/happy-cli/src/api/api.ts index fc38118043..e85bbf12c8 100644 --- a/packages/happy-cli/src/api/api.ts +++ b/packages/happy-cli/src/api/api.ts @@ -9,6 +9,9 @@ import { configuration } from '@/configuration'; import chalk from 'chalk'; import { Credentials } from '@/persistence'; import { connectionState, isNetworkError } from '@/utils/serverConnectionErrors'; +import { existsSync, readFileSync } from 'node:fs'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; export class ApiClient { @@ -33,15 +36,66 @@ export class ApiClient { state: AgentState | null }): Promise { + const loadRestoreSessionSnapshot = (): { encryptionKey: Uint8Array; encryptionVariant: 'dataKey' } | null => { + const restoreSessionId = process.env.HAPPY_RESTORE_SESSION_ID; + const restoreSessionTag = process.env.HAPPY_RESTORE_SESSION_TAG || process.env.HAPPY_SESSION_TAG_OVERRIDE; + if (!restoreSessionId && !restoreSessionTag) { + return null; + } + + const candidates: string[] = []; + if (restoreSessionId) { + candidates.push(join(homedir(), '.happy-session-crypto', `session-${restoreSessionId}.json`)); + } + if (restoreSessionTag) { + candidates.push(join(homedir(), '.happy-session-crypto', `tag-${restoreSessionTag}.json`)); + } + + for (const candidatePath of candidates) { + try { + if (!existsSync(candidatePath)) { + continue; + } + const parsed = JSON.parse(readFileSync(candidatePath, 'utf8')); + if (parsed?.encryptionVariant !== 'dataKey' || !parsed?.encryptionKeyBase64) { + continue; + } + const restoredKey = Uint8Array.from(Buffer.from(parsed.encryptionKeyBase64, 'base64')); + if (restoredKey.length !== 32) { + continue; + } + logger.debug('[SessionCrypto] Reusing saved session encryption key for restore', { + source: candidatePath, + sessionId: parsed.sessionId, + encryptionVariant: parsed.encryptionVariant + }); + return { + encryptionKey: restoredKey, + encryptionVariant: parsed.encryptionVariant + }; + } catch (error) { + logger.debug(`[SessionCrypto] Failed loading restore snapshot from ${candidatePath}`, error); + } + } + + return null; + }; + + const restoreSnapshot = loadRestoreSessionSnapshot(); + // Resolve encryption key let dataEncryptionKey: Uint8Array | null = null; let encryptionKey: Uint8Array; let encryptionVariant: 'legacy' | 'dataKey'; if (this.credential.encryption.type === 'dataKey') { - - // Generate new encryption key - encryptionKey = getRandomBytes(32); - encryptionVariant = 'dataKey'; + if (restoreSnapshot) { + encryptionKey = restoreSnapshot.encryptionKey; + encryptionVariant = restoreSnapshot.encryptionVariant; + } else { + // Generate new encryption key + encryptionKey = getRandomBytes(32); + encryptionVariant = 'dataKey'; + } // Derive and encrypt data encryption key // const contentDataKey = await deriveKey(this.secret, 'Happy EnCoder', ['content']); diff --git a/packages/happy-cli/src/codex/codexAppServerClient.ts b/packages/happy-cli/src/codex/codexAppServerClient.ts new file mode 100644 index 0000000000..9468e81465 --- /dev/null +++ b/packages/happy-cli/src/codex/codexAppServerClient.ts @@ -0,0 +1,710 @@ +import { spawn, execSync, type ChildProcessWithoutNullStreams } from 'node:child_process'; +import { randomUUID } from 'node:crypto'; +import { logger } from '@/ui/logger'; +import type { CodexPermissionHandler } from './utils/permissionHandler'; +import type { CodexSessionConfig, CodexToolResponse } from './types'; + +const DEFAULT_TIMEOUT = 14 * 24 * 60 * 60 * 1000; + +function createAbortError(): Error { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + return error; +} + +function mapPermissionResultToApprovalDecision(result: any, kind: 'command' | 'file'): any { + if (result?.decision === 'approved_for_session') { + return 'acceptForSession'; + } + if (result?.decision === 'approved_execpolicy_amendment' && kind === 'command' && result.execPolicyAmendment?.command?.length) { + return { + acceptWithExecpolicyAmendment: { + execpolicy_amendment: result.execPolicyAmendment.command + } + }; + } + if (result?.decision === 'abort') { + return 'cancel'; + } + if (result?.decision === 'approved' || result?.decision === 'approved_execpolicy_amendment') { + return 'accept'; + } + return 'decline'; +} + +function mapSandboxModeToPolicy(mode: CodexSessionConfig['sandbox']) { + switch (mode) { + case 'read-only': + return { + type: 'readOnly', + access: { type: 'fullAccess' }, + networkAccess: false + }; + case 'danger-full-access': + return { + type: 'dangerFullAccess' + }; + case 'workspace-write': + default: + return { + type: 'workspaceWrite', + writableRoots: [], + readOnlyAccess: { type: 'fullAccess' }, + networkAccess: false, + excludeTmpdirEnvVar: false, + excludeSlashTmp: false + }; + } +} + +function stringifyError(error: unknown): unknown { + if (error instanceof Error) { + return { + name: error.name, + message: error.message, + stack: error.stack + }; + } + return error; +} + +function ensureCodexCliAvailable(): void { + try { + execSync('codex --version', { stdio: 'ignore' }); + } catch { + throw new Error( + 'Codex CLI not found or not executable.\n' + + '\n' + + 'To install codex:\n' + + ' npm install -g @openai/codex\n' + + '\n' + + 'Alternatively, use Claude:\n' + + ' happy claude' + ); + } +} + +type PendingRequest = { + resolve: (value: any) => void; + reject: (error: Error) => void; + timer: NodeJS.Timeout; +}; + +type PendingTurn = { + resolve: (value: CodexToolResponse & { structuredContent?: any }) => void; + reject: (error: Error) => void; + timer: NodeJS.Timeout; +}; + +export class CodexAppServerClient { + private child: ChildProcessWithoutNullStreams | null = null; + private connected = false; + private sessionId: string | null = null; + private conversationId: string | null = null; + private handler: ((event: any) => void) | null = null; + private permissionHandler: CodexPermissionHandler | null = null; + private requestCounter = 0; + private stdoutBuffer = ''; + private pendingRequests = new Map(); + private pendingTurns = new Map(); + private lastAgentMessages = new Map(); + private resumed = false; + private threadConfig: Partial = {}; + + public sandboxEnabled = false; + + setHandler(handler: ((event: any) => void) | null): void { + this.handler = handler; + } + + setPermissionHandler(handler: CodexPermissionHandler): void { + this.permissionHandler = handler; + } + + seedSessionIdentifiers(sessionId: string | null, conversationId: string | null): void { + const normalizedSessionId = typeof sessionId === 'string' && sessionId.length > 0 ? sessionId : null; + const normalizedConversationId = + typeof conversationId === 'string' && conversationId.length > 0 ? conversationId : normalizedSessionId; + this.sessionId = normalizedSessionId; + this.conversationId = normalizedConversationId; + this.resumed = false; + logger.debug('[CodexAppServer] Session identifiers seeded', { + sessionId: this.sessionId, + conversationId: this.conversationId + }); + } + + getSessionId(): string | null { + return this.sessionId; + } + + hasActiveSession(): boolean { + return this.sessionId !== null; + } + + clearSession(): void { + const previousSessionId = this.sessionId; + this.sessionId = null; + this.conversationId = null; + this.resumed = false; + this.threadConfig = {}; + logger.debug('[CodexAppServer] Session cleared, previous sessionId:', previousSessionId); + } + + storeSessionForResume(): string | null { + logger.debug('[CodexAppServer] Storing session for potential resume:', this.sessionId); + return this.sessionId; + } + + async forceCloseSession(): Promise { + logger.debug('[CodexAppServer] Force closing session'); + try { + await this.disconnect(); + } finally { + this.clearSession(); + } + logger.debug('[CodexAppServer] Session force-closed'); + } + + async disconnect(): Promise { + if (!this.child) { + this.connected = false; + return; + } + + const child = this.child; + this.child = null; + this.connected = false; + + try { + child.kill('SIGTERM'); + } catch { + } + + await new Promise((resolve) => { + const timer = setTimeout(() => resolve(), 500); + child.once('exit', () => { + clearTimeout(timer); + resolve(); + }); + }); + } + + async connect(): Promise { + if (this.connected) return; + + ensureCodexCliAvailable(); + logger.debug('[CodexAppServer] Connecting using command: codex app-server --listen stdio://'); + + const transportEnv = Object.keys(process.env).reduce((acc, key) => { + const value = process.env[key]; + if (typeof value === 'string') { + acc[key] = value; + } + return acc; + }, {} as Record); + + this.child = spawn('codex', ['app-server', '--listen', 'stdio://'], { + env: transportEnv, + stdio: ['pipe', 'pipe', 'pipe'] + }); + + this.child.stdout.on('data', (chunk) => { + this.handleStdoutChunk(chunk.toString('utf8')); + }); + + this.child.stderr.on('data', (chunk) => { + const text = chunk.toString('utf8').trim(); + if (text) { + logger.debug('[CodexAppServer][stderr]', text); + } + }); + + this.child.on('exit', (code, signal) => { + logger.debug('[CodexAppServer] Process exited', { code, signal }); + const error = new Error(`Codex app-server exited (code=${code ?? 'null'}, signal=${signal ?? 'null'})`); + for (const { reject, timer } of this.pendingRequests.values()) { + clearTimeout(timer); + reject(error); + } + this.pendingRequests.clear(); + for (const { reject, timer } of this.pendingTurns.values()) { + clearTimeout(timer); + reject(error); + } + this.pendingTurns.clear(); + this.connected = false; + this.child = null; + }); + + this.child.on('error', (error) => { + logger.debug('[CodexAppServer] Process error', stringifyError(error)); + }); + + await this.request('initialize', { + clientInfo: { + name: 'happy-codex-app-server-client', + version: '1.0.0' + }, + capabilities: { + experimentalApi: true + } + }); + + this.connected = true; + logger.debug('[CodexAppServer] Connected'); + } + + private handleStdoutChunk(chunk: string): void { + this.stdoutBuffer += chunk; + + while (true) { + const newlineIndex = this.stdoutBuffer.indexOf('\n'); + if (newlineIndex === -1) { + break; + } + const line = this.stdoutBuffer.slice(0, newlineIndex).trim(); + this.stdoutBuffer = this.stdoutBuffer.slice(newlineIndex + 1); + if (!line) { + continue; + } + + try { + const message = JSON.parse(line); + this.dispatchMessage(message); + } catch (error) { + logger.debug('[CodexAppServer] Failed to parse stdout line', { line, error: stringifyError(error) }); + } + } + } + + private dispatchMessage(message: any): void { + if (message && typeof message === 'object' && 'id' in message && ('result' in message || 'error' in message)) { + const pending = this.pendingRequests.get(message.id); + if (!pending) { + return; + } + clearTimeout(pending.timer); + this.pendingRequests.delete(message.id); + if ('error' in message) { + pending.reject(new Error(message.error?.message || 'App server request failed')); + } else { + pending.resolve(message.result); + } + return; + } + + if (message && typeof message === 'object' && 'id' in message && 'method' in message) { + void this.handleServerRequest(message); + return; + } + + if (message && typeof message === 'object' && 'method' in message) { + this.handleNotification(message); + } + } + + private send(message: unknown): void { + if (!this.child?.stdin) { + throw new Error('Codex app-server stdin is not available'); + } + this.child.stdin.write(`${JSON.stringify(message)}\n`); + } + + private request(method: string, params: unknown, options?: { timeout?: number; signal?: AbortSignal }): Promise { + const id = `${method}-${++this.requestCounter}-${randomUUID()}`; + const timeoutMs = options?.timeout ?? DEFAULT_TIMEOUT; + + if (options?.signal?.aborted) { + return Promise.reject(createAbortError()); + } + + return new Promise((resolve, reject) => { + let settled = false; + + const cleanupAbort = () => { + options?.signal?.removeEventListener('abort', onAbort); + }; + + const settleReject = (error: Error) => { + if (settled) { + return; + } + settled = true; + cleanupAbort(); + clearTimeout(timer); + this.pendingRequests.delete(id); + reject(error); + }; + + const settleResolve = (value: unknown) => { + if (settled) { + return; + } + settled = true; + cleanupAbort(); + clearTimeout(timer); + this.pendingRequests.delete(id); + resolve(value); + }; + + const onAbort = () => { + settleReject(createAbortError()); + }; + + const timer = setTimeout(() => { + settleReject(new Error(`Codex app-server request timed out: ${method}`)); + }, timeoutMs); + + if (options?.signal) { + options.signal.addEventListener('abort', onAbort, { once: true }); + } + + this.pendingRequests.set(id, { resolve: settleResolve, reject: settleReject, timer }); + this.send({ + id, + method, + params + }); + }); + } + + private async handleServerRequest(message: any): Promise { + try { + let result: unknown; + switch (message.method) { + case 'item/commandExecution/requestApproval': + result = await this.handleCommandApproval(message.params); + break; + case 'item/fileChange/requestApproval': + result = await this.handleFileChangeApproval(message.params); + break; + case 'item/tool/call': + result = { + success: false, + contentItems: [ + { + type: 'inputText', + text: `Dynamic tool calls are not supported by Happy's app-server restore path: ${message.params?.tool || 'unknown'}` + } + ] + }; + break; + case 'item/tool/requestUserInput': + result = { answers: {} }; + break; + case 'mcpServer/elicitation/request': + result = { action: 'decline' }; + break; + default: + throw new Error(`Unsupported app-server request method: ${message.method}`); + } + + this.send({ + id: message.id, + result + }); + } catch (error) { + logger.debug('[CodexAppServer] Error handling server request', { + method: message?.method, + error: stringifyError(error) + }); + this.send({ + id: message.id, + error: { + code: -32000, + message: error instanceof Error ? error.message : 'Unknown app-server request error' + } + }); + } + } + + private async handleCommandApproval(params: any): Promise<{ decision: any }> { + if (!this.permissionHandler) { + return { decision: 'decline' }; + } + + const permissionId = String(params.approvalId ?? params.itemId ?? randomUUID()); + const command = Array.isArray(params.command) + ? params.command.join(' ') + : typeof params.command === 'string' + ? params.command + : ''; + const result = await this.permissionHandler.handleToolCall(permissionId, 'CodexBash', { + command, + cwd: params.cwd, + reason: params.reason, + commandActions: params.commandActions, + proposedExecpolicyAmendment: params.proposedExecpolicyAmendment + }); + + return { + decision: mapPermissionResultToApprovalDecision(result, 'command') + }; + } + + private async handleFileChangeApproval(params: any): Promise<{ decision: any }> { + if (!this.permissionHandler) { + return { decision: 'decline' }; + } + const permissionId = String(params.itemId ?? randomUUID()); + const result = await this.permissionHandler.handleToolCall(permissionId, 'CodexPatch', params); + return { + decision: mapPermissionResultToApprovalDecision(result, 'file') + }; + } + + private handleNotification(notification: any): void { + if (notification.method === 'turn/completed') { + this.resolveTurn(notification.params?.turn?.id || notification.params?.turnId); + return; + } + + if (!notification.method?.startsWith('codex/event/')) { + return; + } + + const params = notification.params || {}; + const msg = params.msg; + if (!msg || typeof msg !== 'object') { + return; + } + + const threadId = typeof msg.thread_id === 'string' && msg.thread_id.length > 0 ? msg.thread_id : null; + const conversationId = + typeof params.conversationId === 'string' && params.conversationId.length > 0 + ? params.conversationId + : threadId; + + if (threadId) { + this.sessionId = threadId; + } + if (conversationId) { + this.conversationId = conversationId; + } + + if (msg.type === 'agent_message') { + const turnId = typeof params.id === 'string' && params.id.length > 0 ? params.id : msg.turn_id; + if (turnId && typeof msg.message === 'string') { + this.lastAgentMessages.set(turnId, msg.message); + } + } else if (msg.type === 'task_complete') { + const turnId = typeof msg.turn_id === 'string' && msg.turn_id.length > 0 ? msg.turn_id : params.id; + this.resolveTurn(turnId, msg.last_agent_message); + } + + this.handler?.(msg); + } + + private resolveTurn(turnId: string | null | undefined, finalMessage?: string): void { + if (!turnId) { + return; + } + const pending = this.pendingTurns.get(turnId); + if (!pending) { + return; + } + + clearTimeout(pending.timer); + this.pendingTurns.delete(turnId); + const message = typeof finalMessage === 'string' && finalMessage.length > 0 + ? finalMessage + : this.lastAgentMessages.get(turnId) || ''; + this.lastAgentMessages.delete(turnId); + + pending.resolve({ + content: message ? [{ type: 'text', text: message }] : [], + structuredContent: { + threadId: this.sessionId, + content: message + } + }); + } + + private buildThreadParams(config?: Partial): Record { + const params: Record = {}; + if (config?.cwd) { + params.cwd = config.cwd; + } + if (config?.config) { + params.config = config.config; + } + if (config?.['approval-policy']) { + params.approvalPolicy = config['approval-policy']; + } + if (config?.sandbox) { + params.sandbox = config.sandbox; + } + if (config?.model) { + params.model = config.model; + } + return params; + } + + private buildTurnParams(prompt: string, config?: Partial): Record { + const params: Record = { + threadId: this.sessionId, + input: [ + { + type: 'text', + text: prompt + } + ] + }; + if (config?.cwd) { + params.cwd = config.cwd; + } + if (config?.['approval-policy']) { + params.approvalPolicy = config['approval-policy']; + } + if (config?.sandbox) { + params.sandboxPolicy = mapSandboxModeToPolicy(config.sandbox); + } + if (config?.model) { + params.model = config.model; + } + return params; + } + + private async ensureThreadResumed(config?: Partial, options?: { timeout?: number; signal?: AbortSignal }): Promise { + if (this.resumed) { + return; + } + if (!this.sessionId) { + throw new Error('No saved Codex thread id to resume'); + } + + const response = await this.request('thread/resume', { + threadId: this.sessionId, + ...this.buildThreadParams(config) + }, options); + const threadId = response?.thread?.id; + if (typeof threadId === 'string' && threadId.length > 0) { + this.sessionId = threadId; + this.conversationId = threadId; + } + this.resumed = true; + this.threadConfig = { ...this.threadConfig, ...config }; + logger.debug('[CodexAppServer] Resumed thread', { threadId: this.sessionId }); + } + + private async waitForTurn(turnId: string | null | undefined, options?: { timeout?: number; signal?: AbortSignal }): Promise { + if (!turnId) { + return { + content: [], + structuredContent: { + threadId: this.sessionId, + content: '' + } + }; + } + + if (options?.signal?.aborted) { + void this.interruptTurn(turnId); + throw createAbortError(); + } + + return new Promise((resolve, reject) => { + let settled = false; + + const cleanupAbort = () => { + options?.signal?.removeEventListener('abort', onAbort); + }; + + const settleReject = (error: Error) => { + if (settled) { + return; + } + settled = true; + cleanupAbort(); + clearTimeout(timer); + this.pendingTurns.delete(turnId); + reject(error); + }; + + const settleResolve = (value: CodexToolResponse & { structuredContent?: any }) => { + if (settled) { + return; + } + settled = true; + cleanupAbort(); + clearTimeout(timer); + this.pendingTurns.delete(turnId); + resolve(value); + }; + + const onAbort = () => { + void this.interruptTurn(turnId); + settleReject(createAbortError()); + }; + + const timer = setTimeout(() => { + settleReject(new Error(`Codex app-server turn timed out: ${turnId}`)); + }, options?.timeout ?? DEFAULT_TIMEOUT); + + if (options?.signal) { + options.signal.addEventListener('abort', onAbort, { once: true }); + } + + this.pendingTurns.set(turnId, { resolve: settleResolve, reject: settleReject, timer }); + }); + } + + private async interruptTurn(turnId: string): Promise { + if (!this.sessionId || !turnId) { + return; + } + try { + await this.request('turn/interrupt', { + threadId: this.sessionId, + turnId + }, { timeout: 5000 }); + logger.debug('[CodexAppServer] Interrupted turn', { threadId: this.sessionId, turnId }); + } catch (error) { + logger.debug('[CodexAppServer] Failed to interrupt turn', { + threadId: this.sessionId, + turnId, + error: stringifyError(error) + }); + } + } + + private async runTurn(prompt: string, config?: Partial, options?: { timeout?: number; signal?: AbortSignal }): Promise { + const response = await this.request('turn/start', this.buildTurnParams(prompt, config), options); + const turnId = response?.turn?.id; + return this.waitForTurn(turnId, options); + } + + async startSession(config: CodexSessionConfig, options?: { signal?: AbortSignal }): Promise { + if (!this.connected) { + await this.connect(); + } + + const response = await this.request('thread/start', this.buildThreadParams(config), options); + const threadId = response?.thread?.id; + if (!threadId) { + throw new Error('Codex app-server did not return a thread id'); + } + + this.sessionId = threadId; + this.conversationId = threadId; + this.resumed = true; + this.threadConfig = { ...config }; + logger.debug('[CodexAppServer] Started thread', { threadId }); + return this.runTurn(config.prompt, config, options); + } + + async continueSession( + prompt: string, + options?: { signal?: AbortSignal; happyConfig?: Partial } + ): Promise { + if (!this.connected) { + await this.connect(); + } + + const config = options?.happyConfig || this.threadConfig; + await this.ensureThreadResumed(config, options); + this.threadConfig = { ...this.threadConfig, ...config }; + return this.runTurn(prompt, config, options); + } +} diff --git a/packages/happy-cli/src/codex/codexMcpClient.ts b/packages/happy-cli/src/codex/codexMcpClient.ts index 25f2e6854d..c91332d1ca 100644 --- a/packages/happy-cli/src/codex/codexMcpClient.ts +++ b/packages/happy-cli/src/codex/codexMcpClient.ts @@ -262,7 +262,7 @@ export class CodexMcpClient { return response as CodexToolResponse; } - async continueSession(prompt: string, options?: { signal?: AbortSignal }): Promise { + async continueSession(prompt: string, options?: { signal?: AbortSignal; happyConfig?: Partial }): Promise { if (!this.connected) await this.connect(); if (!this.sessionId) { @@ -292,6 +292,20 @@ export class CodexMcpClient { return response as CodexToolResponse; } + seedSessionIdentifiers(sessionId: string | null, conversationId: string | null): void { + const normalizedSessionId = typeof sessionId === 'string' && sessionId.length > 0 ? sessionId : null; + const normalizedConversationId = + typeof conversationId === 'string' && conversationId.length > 0 + ? conversationId + : normalizedSessionId; + this.sessionId = normalizedSessionId; + this.conversationId = normalizedConversationId; + logger.debug('[CodexMCP] Session identifiers seeded', { + sessionId: this.sessionId, + conversationId: this.conversationId, + }); + } + private updateIdentifiersFromEvent(event: any): void { if (!event || typeof event !== 'object') { diff --git a/packages/happy-cli/src/codex/runCodex.ts b/packages/happy-cli/src/codex/runCodex.ts index 0d51a7b974..b2d119665e 100644 --- a/packages/happy-cli/src/codex/runCodex.ts +++ b/packages/happy-cli/src/codex/runCodex.ts @@ -2,6 +2,7 @@ import { render } from "ink"; import React from "react"; import { ApiClient } from '@/api/api'; import { CodexMcpClient } from './codexMcpClient'; +import { CodexAppServerClient } from './codexAppServerClient'; import { CodexPermissionHandler } from './utils/permissionHandler'; import { ReasoningProcessor } from './utils/reasoningProcessor'; import { DiffProcessor } from './utils/diffProcessor'; @@ -31,6 +32,7 @@ import { stopCaffeinate } from "@/utils/caffeinate"; import { connectionState } from '@/utils/serverConnectionErrors'; import { setupOfflineReconnection } from '@/utils/setupOfflineReconnection'; import type { ApiSessionClient } from '@/api/apiSession'; +import type { Session as HappySession } from '@/api/types'; import { resolveCodexExecutionPolicy } from './executionPolicy'; import { mapCodexMcpMessageToSessionEnvelopes, mapCodexProcessorMessageToSessionEnvelopes } from './utils/sessionProtocolMapper'; @@ -81,7 +83,7 @@ export async function runCodex(opts: { // Define session // - const sessionTag = randomUUID(); + const sessionTag = process.env.HAPPY_SESSION_TAG_OVERRIDE || randomUUID(); // Set backend for offline warnings (before any API calls) connectionState.setBackend('Codex'); @@ -118,7 +120,156 @@ export async function runCodex(opts: { startedBy: opts.startedBy, sandbox: sandboxConfig, }); - const response = await api.getOrCreateSession({ tag: sessionTag, metadata, state }); + let currentCodexSessionId: string | null = null; + let currentCodexConversationId: string | null = null; + let bootResumeFile: string | null = null; + + const loadCodexIdentifiersFromSnapshot = (): boolean => { + const restoreSessionId = process.env.HAPPY_RESTORE_SESSION_ID; + const restoreSessionTag = process.env.HAPPY_RESTORE_SESSION_TAG || sessionTag; + const candidates: string[] = []; + if (restoreSessionId) { + candidates.push(join(os.homedir(), '.happy-session-crypto', `session-${restoreSessionId}.json`)); + } + if (restoreSessionTag) { + candidates.push(join(os.homedir(), '.happy-session-crypto', `tag-${restoreSessionTag}.json`)); + } + + for (const candidatePath of candidates) { + try { + if (!fs.existsSync(candidatePath)) { + continue; + } + const parsed = JSON.parse(fs.readFileSync(candidatePath, 'utf8')); + const sid = typeof parsed?.codexSessionId === 'string' && parsed.codexSessionId.length > 0 + ? parsed.codexSessionId + : null; + const cid = typeof parsed?.codexConversationId === 'string' && parsed.codexConversationId.length > 0 + ? parsed.codexConversationId + : null; + if (!sid && !cid) { + continue; + } + currentCodexSessionId = sid || cid; + currentCodexConversationId = cid || sid; + logger.debug('[SessionCrypto] Loaded codex identifiers from snapshot', { + source: candidatePath, + codexSessionId: currentCodexSessionId, + codexConversationId: currentCodexConversationId + }); + return true; + } catch (error) { + logger.debug(`[SessionCrypto] Failed loading codex identifiers from ${candidatePath}`, error); + } + } + + return false; + }; + + const persistSessionSnapshot = (sessionToPersist: ApiSessionClient | HappySession): void => { + try { + const rawSession = sessionToPersist as any; + const persistedSessionId = rawSession?.id || rawSession?.sessionId; + const persistedEncryptionKey = rawSession?.encryptionKey; + const persistedEncryptionVariant = rawSession?.encryptionVariant; + + if (!persistedSessionId || !persistedEncryptionKey || !persistedEncryptionVariant) { + return; + } + const snapshotDir = join(os.homedir(), '.happy-session-crypto'); + if (!fs.existsSync(snapshotDir)) { + fs.mkdirSync(snapshotDir, { recursive: true, mode: 0o700 }); + } + + const payload: Record = { + sessionId: persistedSessionId, + sessionTag, + encryptionVariant: persistedEncryptionVariant, + encryptionKeyBase64: Buffer.from(persistedEncryptionKey).toString('base64'), + savedAt: Date.now() + }; + + if (currentCodexSessionId) { + payload.codexSessionId = currentCodexSessionId; + } + if (currentCodexConversationId) { + payload.codexConversationId = currentCodexConversationId; + } + + const sessionPath = join(snapshotDir, `session-${persistedSessionId}.json`); + const tagPath = join(snapshotDir, `tag-${sessionTag}.json`); + fs.writeFileSync(sessionPath, JSON.stringify(payload, null, 2), { mode: 0o600 }); + fs.writeFileSync(tagPath, JSON.stringify(payload, null, 2), { mode: 0o600 }); + logger.debug(`[SessionCrypto] Persisted session snapshot: ${persistedSessionId} (tag: ${sessionTag})`); + } catch (error) { + logger.debug('[SessionCrypto] Failed to persist session snapshot', error); + } + }; + + const loadFallbackSessionFromSnapshot = (): HappySession | null => { + const restoreSessionId = process.env.HAPPY_RESTORE_SESSION_ID; + const restoreSessionTag = process.env.HAPPY_RESTORE_SESSION_TAG || sessionTag; + const candidates: string[] = []; + if (restoreSessionId) { + candidates.push(join(os.homedir(), '.happy-session-crypto', `session-${restoreSessionId}.json`)); + } + if (restoreSessionTag) { + candidates.push(join(os.homedir(), '.happy-session-crypto', `tag-${restoreSessionTag}.json`)); + } + + for (const candidatePath of candidates) { + try { + if (!fs.existsSync(candidatePath)) { + continue; + } + const parsed = JSON.parse(fs.readFileSync(candidatePath, 'utf8')); + if (!parsed?.sessionId || !parsed?.encryptionKeyBase64 || !parsed?.encryptionVariant) { + continue; + } + + if (typeof parsed.codexSessionId === 'string' && parsed.codexSessionId.length > 0) { + currentCodexSessionId = parsed.codexSessionId; + } + if (typeof parsed.codexConversationId === 'string' && parsed.codexConversationId.length > 0) { + currentCodexConversationId = parsed.codexConversationId; + } + if (!currentCodexSessionId && currentCodexConversationId) { + currentCodexSessionId = currentCodexConversationId; + } + if (!currentCodexConversationId && currentCodexSessionId) { + currentCodexConversationId = currentCodexSessionId; + } + + const encryptionKey = Uint8Array.from(Buffer.from(parsed.encryptionKeyBase64, 'base64')); + logger.debug(`[SessionCrypto] Loaded fallback snapshot from ${candidatePath}`); + return { + id: parsed.sessionId, + seq: 0, + metadata, + metadataVersion: 0, + agentState: state, + agentStateVersion: 0, + encryptionKey, + encryptionVariant: parsed.encryptionVariant + }; + } catch (error) { + logger.debug(`[SessionCrypto] Failed loading fallback snapshot from ${candidatePath}`, error); + } + } + + return null; + }; + + loadCodexIdentifiersFromSnapshot(); + + let response: HappySession | null = await api.getOrCreateSession({ tag: sessionTag, metadata, state }); + if (!response) { + const fallbackResponse = loadFallbackSessionFromSnapshot(); + if (fallbackResponse) { + response = fallbackResponse; + logger.debug(`[SessionCrypto] Using fallback snapshot session: ${fallbackResponse.id}`); + } + } // Handle server unreachable case - create offline stub with hot reconnection let session: ApiSessionClient; @@ -133,6 +284,16 @@ export async function runCodex(opts: { response, onSessionSwap: (newSession) => { session = newSession; + persistSessionSnapshot(newSession); + notifyDaemonSessionStarted(newSession.sessionId, metadata).then((result) => { + if (result?.error) { + logger.debug(`[START] Failed to report swapped session to daemon:`, result.error); + } else { + logger.debug(`[START] Reported swapped session ${newSession.sessionId} to daemon`); + } + }).catch((error) => { + logger.debug('[START] Failed to report swapped session to daemon (exception):', error); + }); // Update permission handler with new session to avoid stale reference if (permissionHandler) { permissionHandler.updateSession(newSession); @@ -140,6 +301,7 @@ export async function runCodex(opts: { } }); session = initialSession; + persistSessionSnapshot(initialSession); // Always report to daemon if it exists (skip if offline) if (response) { @@ -361,7 +523,11 @@ export async function runCodex(opts: { // Start Context // - const client = new CodexMcpClient(sandboxConfig); + const useRestoreClient = Boolean(currentCodexSessionId || currentCodexConversationId); + logger.debug(useRestoreClient ? '[Codex] Using app-server restore client' : '[Codex] Using MCP client'); + const client: CodexMcpClient | CodexAppServerClient = useRestoreClient + ? new CodexAppServerClient() + : new CodexMcpClient(sandboxConfig); // Helper: find Codex session transcript for a given sessionId function findCodexResumeFile(sessionId: string | null): string | null { @@ -404,6 +570,93 @@ export async function runCodex(opts: { return null; } } + + function buildResumePromptContext(resumeFile: string | null): string { + if (!resumeFile) return ''; + try { + const raw = fs.readFileSync(resumeFile, 'utf8'); + const lines = raw.split('\n').filter((line) => line.trim().length > 0); + const messages: Array<{ role: 'User' | 'Assistant'; text: string }> = []; + for (const line of lines) { + let parsed: any; + try { + parsed = JSON.parse(line); + } catch { + continue; + } + + if (parsed?.type !== 'response_item' || parsed?.payload?.type !== 'message') { + continue; + } + + const role = parsed.payload.role === 'user' + ? 'User' + : parsed.payload.role === 'assistant' + ? 'Assistant' + : null; + if (!role || !Array.isArray(parsed.payload.content)) { + continue; + } + + const text = parsed.payload.content + .map((item: any) => { + if (role === 'User' && item?.type === 'input_text' && typeof item.text === 'string') { + return item.text; + } + if (role === 'Assistant' && item?.type === 'output_text' && typeof item.text === 'string') { + return item.text; + } + return ''; + }) + .filter(Boolean) + .join('\n') + .trim(); + if (!text) { + continue; + } + + const last = messages[messages.length - 1]; + if (last && last.role === role && last.text === text) { + continue; + } + messages.push({ role, text }); + } + + if (!messages.length) { + return ''; + } + + const selected: string[] = []; + let totalChars = 0; + for (let i = messages.length - 1; i >= 0; i -= 1) { + const formatted = `${messages[i].role}: ${messages[i].text}`; + if (selected.length >= 24 || totalChars + formatted.length > 12000) { + break; + } + selected.push(formatted); + totalChars += formatted.length + 2; + } + selected.reverse(); + + return [ + 'Restored context from the previous unavailable Codex thread.', + 'Treat the following transcript as prior conversation state and preserved memory.', + ...selected + ].join('\n\n'); + } catch (error) { + logger.debug('[Codex] Failed to build resume prompt context', error); + return ''; + } + } + + if (currentCodexSessionId) { + bootResumeFile = findCodexResumeFile(currentCodexSessionId); + if (bootResumeFile) { + logger.debug('[Codex] Found resume file from session snapshot:', bootResumeFile); + } else { + logger.debug(`[Codex] No resume file found for snapshot codex session ${currentCodexSessionId}`); + } + } permissionHandler = new CodexPermissionHandler(session); const reasoningProcessor = new ReasoningProcessor((message) => { const envelopes = mapCodexProcessorMessageToSessionEnvelopes(message, { currentTurnId }); @@ -419,6 +672,18 @@ export async function runCodex(opts: { }); client.setPermissionHandler(permissionHandler); client.setHandler((msg) => { + const messageThreadId = typeof msg?.thread_id === 'string' && msg.thread_id.length > 0 ? msg.thread_id : null; + if (messageThreadId && (messageThreadId !== currentCodexSessionId || messageThreadId !== currentCodexConversationId)) { + currentCodexSessionId = messageThreadId; + currentCodexConversationId = messageThreadId; + logger.debug('[SessionCrypto] Updated codex identifiers', { + sessionId: session.sessionId, + codexSessionId: currentCodexSessionId, + codexConversationId: currentCodexConversationId + }); + persistSessionSnapshot(session); + } + logger.debug(`[Codex] MCP message: ${JSON.stringify(msg)}`); // Add messages to the ink UI buffer based on message type @@ -539,6 +804,12 @@ export async function runCodex(opts: { await client.connect(); logger.debug('[codex]: client.connect done'); let wasCreated = false; + if (currentCodexSessionId || currentCodexConversationId) { + client.seedSessionIdentifiers(currentCodexSessionId, currentCodexConversationId); + wasCreated = true; + logger.debug('[Codex] Seeded client with saved identifiers; first turn will attempt continueSession'); + messageBuffer.addMessage('Attempting to resume saved Codex thread...', 'status'); + } let currentModeHash: string | null = null; let pending: { message: string; mode: EnhancedMode; isolate: boolean; hash: string } | null = null; // If we restart (e.g., mode change), use this to carry a resume file @@ -612,10 +883,10 @@ export async function runCodex(opts: { message.mode.permissionMode, sandboxManagedByHappy, ); - - if (!wasCreated) { + const startNewSession = async () => { + const basePrompt = first ? message.message + '\n\n' + CHANGE_TITLE_INSTRUCTION : message.message; const startConfig: CodexSessionConfig = { - prompt: first ? message.message + '\n\n' + CHANGE_TITLE_INSTRUCTION : message.message, + prompt: basePrompt, sandbox: executionPolicy.sandbox, 'approval-policy': executionPolicy.approvalPolicy, config: { mcp_servers: mcpServers } @@ -623,44 +894,105 @@ export async function runCodex(opts: { if (message.mode.model) { startConfig.model = message.mode.model; } - - // Check for resume file from multiple sources + let resumeFile: string | null = null; - - // Priority 1: Explicit resume file from mode change + if (nextExperimentalResume) { resumeFile = nextExperimentalResume; - nextExperimentalResume = null; // consume once + nextExperimentalResume = null; logger.debug('[Codex] Using resume file from mode change:', resumeFile); - } - // Priority 2: Resume from stored abort session - else if (storedSessionIdForResume) { + } else if (storedSessionIdForResume) { const abortResumeFile = findCodexResumeFile(storedSessionIdForResume); if (abortResumeFile) { resumeFile = abortResumeFile; logger.debug('[Codex] Using resume file from aborted session:', resumeFile); messageBuffer.addMessage('Resuming from aborted session...', 'status'); } - storedSessionIdForResume = null; // consume once + storedSessionIdForResume = null; + } else if (bootResumeFile) { + resumeFile = bootResumeFile; + bootResumeFile = null; + logger.debug('[Codex] Using resume file from session snapshot:', resumeFile); + messageBuffer.addMessage('Resuming saved context...', 'status'); } - - // Apply resume file if found + if (resumeFile) { (startConfig.config as any).experimental_resume = resumeFile; + const resumePromptContext = buildResumePromptContext(resumeFile); + if (resumePromptContext) { + startConfig.prompt = `${resumePromptContext}\n\nCurrent user message:\n${basePrompt}`; + } } - + await client.startSession( startConfig, { signal: abortController.signal } ); wasCreated = true; first = false; + }; + + if (!wasCreated) { + await startNewSession(); } else { - const response = await client.continueSession( - message.message, - { signal: abortController.signal } - ); - logger.debug('[Codex] continueSession response:', response); + try { + const continueConfig: Partial = { + sandbox: executionPolicy.sandbox, + 'approval-policy': executionPolicy.approvalPolicy + }; + if (message.mode.model) { + continueConfig.model = message.mode.model; + } + + const response = await client.continueSession( + message.message, + { + signal: abortController.signal, + happyConfig: continueConfig + } + ); + logger.debug('[Codex] continueSession response:', response); + + const continueErrorText = (() => { + const typedResponse = response as any; + if (!typedResponse?.isError) { + return ''; + } + if (typeof typedResponse?.structuredContent?.content === 'string') { + return typedResponse.structuredContent.content; + } + if (Array.isArray(typedResponse?.content)) { + return typedResponse.content + .map((item: any) => typeof item?.text === 'string' ? item.text : '') + .filter(Boolean) + .join('\n'); + } + return ''; + })(); + + if (continueErrorText && (/session not found/i.test(continueErrorText) || /thread_id/i.test(continueErrorText))) { + throw new Error(continueErrorText); + } + + first = false; + } catch (continueError) { + const continueMessage = continueError instanceof Error + ? `${continueError.name}: ${continueError.message}` + : String(continueError); + const shouldStartNewSession = + /no active session/i.test(continueMessage) || + /not found/i.test(continueMessage) || + /invalid/i.test(continueMessage); + if (!shouldStartNewSession) { + throw continueError; + } + + logger.debug('[Codex] continueSession failed; falling back to startSession', continueError); + messageBuffer.addMessage('Resume failed, starting a new session...', 'status'); + client.clearSession(); + wasCreated = false; + await startNewSession(); + } } } catch (error) { logger.warn('Error in codex session:', error); diff --git a/packages/happy-cli/src/index.ts b/packages/happy-cli/src/index.ts index ca3c031925..cebdece2b1 100644 --- a/packages/happy-cli/src/index.ts +++ b/packages/happy-cli/src/index.ts @@ -30,6 +30,10 @@ import { spawnHappyCLI } from './utils/spawnHappyCLI' import { claudeCliPath } from './claude/claudeLocal' import { execFileSync } from 'node:child_process' import { extractNoSandboxFlag } from './utils/sandboxFlags' +import { existsSync, readFileSync, readdirSync, statSync } from 'node:fs' +import { homedir } from 'node:os' +import { join, resolve } from 'node:path' +import { delay } from './utils/time' (async () => { @@ -100,6 +104,11 @@ import { extractNoSandboxFlag } from './utils/sandboxFlags' } else if (subcommand === 'codex') { // Handle codex command try { + if (args[1] === 'resume') { + await handleCodexResumeCommand(args.slice(2)); + return; + } + const { runCodex } = await import('@/codex/runCodex'); // Parse startedBy argument @@ -633,6 +642,7 @@ ${chalk.bold('Usage:')} happy [options] Start Claude with mobile control happy auth Manage authentication happy codex Start Codex mode + happy codex resume Resume Happy Codex session happy gemini Start Gemini mode (ACP) happy acp Start a generic ACP-compatible agent happy connect Connect AI vendor API keys @@ -723,6 +733,287 @@ ${chalk.bold.cyan('Claude Code Options (from `claude --help`):')} } })(); +function printCodexResumeHelp(): void { + console.log(` +${chalk.bold('happy codex resume')} - Resume Happy Codex session + +${chalk.bold('Usage:')} + happy codex resume --metadata-file + happy codex resume --path --pid + +${chalk.bold('Options:')} + --metadata-file Read Happy metadata JSON from file + --path Override metadata.path + --pid Override metadata.hostPid + --session-tag Use known session tag directly + --home-dir Override metadata.homeDir + --happy-home-dir Override metadata.happyHomeDir + --dry-run Resolve parameters and print without launching + -h, --help Show help + +${chalk.bold('Examples:')} + happy codex resume cmmexample --metadata-file ./metadata.json + happy codex resume cmmexample --path /Users/me/project --pid 12345 +`); +} + +function listHappyLogFiles(happyHomeDir: string): string[] { + try { + const logDir = join(happyHomeDir, 'logs') + return readdirSync(logDir) + .map((entry) => join(logDir, entry)) + .filter((fullPath) => { + try { + return statSync(fullPath).isFile() && fullPath.endsWith('.log') + } catch { + return false + } + }) + .sort((a, b) => { + try { + return statSync(b).mtimeMs - statSync(a).mtimeMs + } catch { + return 0 + } + }) + } catch { + return [] + } +} + +function extractSessionTagFromLog(logPath: string): string | null { + try { + const raw = readFileSync(logPath, 'utf8') + const matches = [...raw.matchAll(/tag: ([0-9a-f-]+)/g)] + .map((match) => match[1]) + .filter(Boolean) + return matches.length > 0 ? matches[matches.length - 1] : null + } catch { + return null + } +} + +function findLatestLogForPid(happyHomeDir: string, pid: string | null): string | null { + if (!pid) { + return null + } + const needle = `pid-${pid}.log` + return listHappyLogFiles(happyHomeDir).find((fullPath) => fullPath.includes(needle)) || null +} + +function findLatestLogForSessionId(happyHomeDir: string, sessionId: string): string | null { + const needle = `Session created/loaded: ${sessionId}` + for (const logPath of listHappyLogFiles(happyHomeDir)) { + try { + if (readFileSync(logPath, 'utf8').includes(needle)) { + return logPath + } + } catch { + } + } + return null +} + +function parseCodexResumeArgs(args: string[]) { + let sessionId: string | null = null + let metadataFile: string | null = null + let workdir: string | null = null + let pid: string | null = null + let sessionTag: string | null = null + let homeDir: string | null = null + let happyHomeDir: string | null = null + let dryRun = false + let showHelp = false + + for (let i = 0; i < args.length; i++) { + const arg = args[i] + if (arg === '-h' || arg === '--help') { + showHelp = true + } else if (arg === '--metadata-file') { + metadataFile = args[++i] + } else if (arg === '--path') { + workdir = args[++i] + } else if (arg === '--pid') { + pid = args[++i] + } else if (arg === '--session-tag') { + sessionTag = args[++i] + } else if (arg === '--home-dir') { + homeDir = args[++i] + } else if (arg === '--happy-home-dir') { + happyHomeDir = args[++i] + } else if (arg === '--dry-run') { + dryRun = true + } else if (!sessionId) { + sessionId = arg + } else { + throw new Error(`Unknown argument for codex resume: ${arg}`) + } + } + + return { + sessionId, + metadataFile, + workdir, + pid, + sessionTag, + homeDir, + happyHomeDir, + dryRun, + showHelp + } +} + +async function resolveCodexResumeParameters(args: string[]) { + const parsed = parseCodexResumeArgs(args) + if (parsed.showHelp) { + printCodexResumeHelp() + return { exit: 0 as number | null } + } + + if (!parsed.sessionId) { + printCodexResumeHelp() + return { exit: 1 as number | null } + } + + let metadata: Record = {} + if (parsed.metadataFile) { + try { + metadata = JSON.parse(readFileSync(resolve(parsed.metadataFile), 'utf8')) + } catch (error) { + throw new Error(`Failed to read metadata file: ${error instanceof Error ? error.message : String(error)}`) + } + } + + const homeDir = parsed.homeDir || metadata.homeDir || homedir() + const happyHomeDir = parsed.happyHomeDir || metadata.happyHomeDir || join(homeDir, '.happy') + const workdir = parsed.workdir || metadata.path + const pidValue = parsed.pid || metadata.hostPid + const hostPid = pidValue === undefined || pidValue === null || pidValue === '' ? null : String(pidValue) + + const sessionSnapshotPath = join(homeDir, '.happy-session-crypto', `session-${parsed.sessionId}.json`) + let snapshot: Record | null = null + if (existsSync(sessionSnapshotPath)) { + try { + snapshot = JSON.parse(readFileSync(sessionSnapshotPath, 'utf8')) + } catch { + } + } + + const logPathFromPid = hostPid ? findLatestLogForPid(happyHomeDir, hostPid) : null + const logPath = logPathFromPid || findLatestLogForSessionId(happyHomeDir, parsed.sessionId) + const resolvedSessionTag = parsed.sessionTag || snapshot?.sessionTag || (logPath ? extractSessionTagFromLog(logPath) : null) + + if (!workdir) { + throw new Error('Could not resolve workdir. Provide --path or metadata.path') + } + if (!existsSync(workdir)) { + throw new Error(`Workdir does not exist: ${workdir}`) + } + if (!resolvedSessionTag) { + throw new Error('Could not resolve session tag. Provide --session-tag or metadata.hostPid with available logs') + } + + const tagSnapshotPath = join(homeDir, '.happy-session-crypto', `tag-${resolvedSessionTag}.json`) + if (!existsSync(sessionSnapshotPath) && !existsSync(tagSnapshotPath)) { + throw new Error(`No restore snapshot found for session ${parsed.sessionId}. Expected ${sessionSnapshotPath} or ${tagSnapshotPath}`) + } + + const settings = await readSettings() + if (metadata.machineId && settings?.machineId && metadata.machineId !== settings.machineId) { + logger.warn(`[codex resume] metadata.machineId (${metadata.machineId}) differs from local machineId (${settings.machineId})`) + } + + return { + exit: null as number | null, + sessionId: parsed.sessionId, + sessionTag: resolvedSessionTag, + workdir, + hostPid, + homeDir, + happyHomeDir, + logPath, + sessionSnapshotPath, + tagSnapshotPath, + metadata, + dryRun: parsed.dryRun + } +} + +async function handleCodexResumeCommand(args: string[]): Promise { + const resolved = await resolveCodexResumeParameters(args) + if (resolved.exit !== null) { + process.exit(resolved.exit) + } + + const env = { + ...process.env, + HAPPY_RESTORE_SESSION_ID: resolved.sessionId, + HAPPY_RESTORE_SESSION_TAG: resolved.sessionTag, + HAPPY_SESSION_TAG_OVERRIDE: resolved.sessionTag + } + + const commandPreview = + `cd ${JSON.stringify(resolved.workdir)} && ` + + `HAPPY_RESTORE_SESSION_ID=${JSON.stringify(resolved.sessionId)} ` + + `HAPPY_RESTORE_SESSION_TAG=${JSON.stringify(resolved.sessionTag)} ` + + `HAPPY_SESSION_TAG_OVERRIDE=${JSON.stringify(resolved.sessionTag)} ` + + `happy codex --happy-starting-mode remote --started-by daemon` + + if (resolved.dryRun) { + console.log('Resolved Happy Codex resume parameters:') + console.log(` Session ID: ${resolved.sessionId}`) + console.log(` Session tag: ${resolved.sessionTag}`) + console.log(` Workdir: ${resolved.workdir}`) + console.log(` Home dir: ${resolved.homeDir}`) + console.log(` Happy home: ${resolved.happyHomeDir}`) + console.log(` Prior log: ${resolved.logPath || 'not found'}`) + console.log(` Session snapshot: ${resolved.sessionSnapshotPath}`) + console.log(` Tag snapshot: ${resolved.tagSnapshotPath}`) + console.log('') + console.log(commandPreview) + return + } + + try { + const sessions = await listDaemonSessions() + const existing = sessions.find((session) => session.happySessionId === resolved.sessionId) + if (existing) { + console.log(`Stopping existing session ${resolved.sessionId} (pid ${existing.pid})...`) + await stopDaemonSession(resolved.sessionId!) + await delay(500) + } + } catch { + } + + const child = spawnHappyCLI( + ['codex', '--happy-starting-mode', 'remote', '--started-by', 'daemon'], + { + cwd: resolved.workdir, + detached: true, + stdio: 'ignore', + env + } + ) + child.unref() + + if (!child.pid) { + throw new Error('Failed to spawn happy codex resume process') + } + + await delay(1200) + const newLogPath = findLatestLogForPid(resolved.happyHomeDir, String(child.pid)) + + console.log('Happy Codex resume started') + console.log(` Session ID: ${resolved.sessionId}`) + console.log(` Session tag: ${resolved.sessionTag}`) + console.log(` Workdir: ${resolved.workdir}`) + console.log(` PID: ${child.pid}`) + if (newLogPath) { + console.log(` Log: ${newLogPath}`) + } + console.log(` Command: ${commandPreview}`) +} + /** * Handle notification command From 78330331d1a931de6d5c340d8a255061d2b7b56d Mon Sep 17 00:00:00 2001 From: KevinChenCodexMini Date: Mon, 9 Mar 2026 20:25:49 +0800 Subject: [PATCH 2/9] Add fork guide for Happy Codex resume --- docs/happy-codex-resume-fork.md | 239 ++++++++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 docs/happy-codex-resume-fork.md diff --git a/docs/happy-codex-resume-fork.md b/docs/happy-codex-resume-fork.md new file mode 100644 index 0000000000..3c7a4e12ae --- /dev/null +++ b/docs/happy-codex-resume-fork.md @@ -0,0 +1,239 @@ +# Happy Codex Resume Fork + +## 概述 + +这个 fork 用来承载 `Happy Codex` 的会话恢复能力增强,目标是让已经关闭的 Happy Codex 会话可以在重新拉起后,尽量保留原来的对话记忆和上下文,而不是只恢复一个空壳会话。 + +当前 fork 信息: + +- Fork 仓库:`https://github.com/KevinCJM/happy` +- 工作分支:`codex/happy-codex-resume` +- 变更提交:`c81bbf1c80ef4b007b144c80c3438be5aa1c2a1e` + +建议用途: + +- 作为你自己的可维护分支继续迭代 +- 作为向上游发 PR 的基础 +- 作为给其他人复现 Happy Codex resume 方案的参考实现 + +## 这个 fork 解决了什么问题 + +原始问题主要有三类: + +1. Happy 会话恢复后在线,但 App 发消息没有响应 +2. 恢复后没有原来的对话记忆和上下文 +3. 需要手工拼环境变量和日志参数,恢复流程不稳定 + +这个 fork 解决后的行为是: + +- 恢复时复用旧 Happy 会话加密 key +- 恢复时读取并保存 Codex session/conversation 标识 +- 恢复路径优先走 Codex 原生 `app-server` 的 `thread/resume` +- 如果旧 thread 不可直接继续,则回退到本地 transcript 恢复 +- 提供正式命令:`happy codex resume` + +## 核心变更 + +### 1. 新增 CLI 命令 + +新增命令: + +```bash +happy codex resume --metadata-file +``` + +也支持: + +```bash +happy codex resume --path --pid +``` + +命令职责: + +- 从 metadata 和本地日志解析恢复参数 +- 自动定位 `sessionTag` +- 自动检查本地恢复快照 +- 自动停掉同一 Happy 会话的旧进程 +- 自动拉起恢复后的 Happy Codex 进程 + +实现入口: + +- `packages/happy-cli/src/index.ts` + +### 2. 修复恢复时的加密 key 复用 + +恢复模式下,如果 Happy 会话重新生成新的 `dataKey`,App 端已有消息就会解密失败,表现为在线但没回复。 + +当前实现改为: + +- 读取 `~/.happy-session-crypto/session-.json` +- 或 `~/.happy-session-crypto/tag-.json` +- 如果找到旧 `dataKey`,直接复用 + +这样恢复后的进程与原 Happy 会话仍然使用同一把会话 key。 + +实现位置: + +- `packages/happy-cli/src/api/api.ts` + +### 3. 增加 Happy 会话快照 + +当前分支会把以下内容持久化到本地快照: + +- `sessionId` +- `sessionTag` +- `encryptionVariant` +- `encryptionKeyBase64` +- `codexSessionId` +- `codexConversationId` + +作用: + +- 下次恢复时不用只靠旧日志 +- 能直接找到原会话对应的 Codex 标识 +- 允许在 Happy 服务端不可达时回退到本地快照恢复 + +实现位置: + +- `packages/happy-cli/src/codex/runCodex.ts` + +### 4. 恢复时优先走 Codex 原生 app-server + +普通新会话仍然走现有 MCP 路径。 +恢复场景则切到新的原生恢复客户端,直接调用: + +- `thread/resume` +- `turn/start` +- `turn/interrupt` + +这是当前恢复原上下文最关键的改动。 + +实现位置: + +- `packages/happy-cli/src/codex/codexAppServerClient.ts` +- `packages/happy-cli/src/codex/runCodex.ts` + +### 5. 旧 thread 无法继续时的回退策略 + +如果旧 Codex thread 不能直接续接,这个 fork 不会直接失败,而是: + +1. 从本地 `~/.codex/sessions` 查找 rollout 文件 +2. 提取最近的用户/助手对话 +3. 把 transcript 作为恢复上下文注入新 session +4. 同时继续使用 `experimental_resume` + +这一步不能保证“云端原 thread 原样复活”,但能最大化保留本地已有记忆和工作上下文。 + +实现位置: + +- `packages/happy-cli/src/codex/runCodex.ts` + +## 变更文件 + +本 fork 当前主要改动文件: + +- `packages/happy-cli/src/index.ts` +- `packages/happy-cli/src/api/api.ts` +- `packages/happy-cli/src/codex/runCodex.ts` +- `packages/happy-cli/src/codex/codexMcpClient.ts` +- `packages/happy-cli/src/codex/codexAppServerClient.ts` + +## 如何获取这个 fork + +如果是在另一台机器上复现: + +```bash +git clone https://github.com/KevinCJM/happy.git +cd happy +git checkout codex/happy-codex-resume +``` + +如果已经有上游仓库本地副本: + +```bash +git remote add fork https://github.com/KevinCJM/happy.git +git fetch fork +git switch -c codex/happy-codex-resume --track fork/codex/happy-codex-resume +``` + +## 如何构建 + +在仓库根目录执行: + +```bash +npx --yes yarn@1.22.22 install +npx --yes yarn@1.22.22 workspace happy-coder build +``` + +## 如何验证 + +### 验证命令是否接入 + +```bash +cd packages/happy-cli +node ./dist/index.mjs codex resume --help +``` + +### 验证恢复逻辑 + +前提: + +- 目标 Happy session 的本地快照仍在 +- 对应的本地 Codex rollout 文件仍在 +- 当前机器仍是原机器 + +示例: + +```bash +happy codex resume --metadata-file ./metadata.json +``` + +重点观察: + +- 进程是否成功启动 +- App 端是否能正常发消息 +- 恢复后的回答是否带有原来的记忆上下文 + +## 向上游发 PR + +当前 fork 分支已经可以直接用于 PR。 + +PR 创建入口: + +- `https://github.com/KevinCJM/happy/pull/new/codex/happy-codex-resume` + +如果你要提交到上游 `slopus/happy`,建议 PR 标题可以用: + +```text +Add Happy Codex resume workflow +``` + +建议 PR 描述至少包含: + +- 新增 `happy codex resume` +- 恢复时复用会话加密 key +- 保存并恢复 Codex session identifiers +- restore 场景优先使用原生 `app-server thread/resume` +- fallback 到本地 transcript/context 恢复 + +## 当前限制 + +这个 fork 的目标是“恢复之前的对话记忆和上下文”,不是保证“服务端原 thread 永久可续接”。 + +需要明确两点: + +1. 旧 Codex thread 可能已经不可继续 +2. 即使旧 thread 不可继续,本 fork 仍会尽量利用本地 transcript 恢复上下文 + +也就是说,这个方案优先保证“恢复体验”,不是强依赖旧服务端 thread 一定存在。 + +## 结论 + +这个 fork 已经把 Happy Codex 的恢复流程从“手工调试级别”推进到了“可命令化、可重复、可分享”的状态。 + +对外可以这样描述这个分支: + +- 它增加了 `happy codex resume` +- 它能恢复 Happy 会话层加密上下文 +- 它能尽量恢复原来的 Codex 对话记忆和工作上下文 +- 它在旧 thread 失效时仍然有本地 transcript fallback From 73121d5c3f6c6009d2d6826a58374b675f11682b Mon Sep 17 00:00:00 2001 From: KevinChenCodexMini Date: Mon, 9 Mar 2026 21:05:34 +0800 Subject: [PATCH 3/9] Prune stale daemon sessions on demand --- .../src/daemon/controlServer.test.ts | 86 +++++++++++++++++++ .../happy-cli/src/daemon/controlServer.ts | 10 ++- .../src/daemon/daemon.integration.test.ts | 72 +++++++++++++--- packages/happy-cli/src/daemon/run.ts | 23 ++--- .../src/daemon/sessionTracking.test.ts | 44 ++++++++++ .../happy-cli/src/daemon/sessionTracking.ts | 34 ++++++++ 6 files changed, 244 insertions(+), 25 deletions(-) create mode 100644 packages/happy-cli/src/daemon/controlServer.test.ts create mode 100644 packages/happy-cli/src/daemon/sessionTracking.test.ts create mode 100644 packages/happy-cli/src/daemon/sessionTracking.ts diff --git a/packages/happy-cli/src/daemon/controlServer.test.ts b/packages/happy-cli/src/daemon/controlServer.test.ts new file mode 100644 index 0000000000..8f1670ac26 --- /dev/null +++ b/packages/happy-cli/src/daemon/controlServer.test.ts @@ -0,0 +1,86 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { startDaemonControlServer } from './controlServer'; +import { TrackedSession } from './types'; +import { SpawnSessionResult } from '@/modules/common/registerCommonHandlers'; + +describe('startDaemonControlServer', () => { + let stopServer: (() => Promise) | null = null; + const spawnSession = vi.fn(async (): Promise => ({ + type: 'success', + sessionId: 'alive' + })); + + afterEach(async () => { + if (stopServer) { + await stopServer(); + stopServer = null; + } + spawnSession.mockClear(); + }); + + it('prunes stale sessions before listing children', async () => { + const children: TrackedSession[] = [ + { startedBy: 'daemon', happySessionId: 'alive', pid: 111 }, + { startedBy: 'daemon', pid: 222 } + ]; + const pruneStaleSessions = vi.fn(() => { + children.splice(1, 1); + return 1; + }); + + const server = await startDaemonControlServer({ + getChildren: () => children, + pruneStaleSessions, + stopSession: vi.fn(() => true), + spawnSession, + requestShutdown: vi.fn(), + onHappySessionWebhook: vi.fn() + }); + stopServer = server.stop; + + const response = await fetch(`http://127.0.0.1:${server.port}/list`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: '{}' + }); + + expect(response.ok).toBe(true); + expect(pruneStaleSessions).toHaveBeenCalledTimes(1); + await expect(response.json()).resolves.toEqual({ + children: [ + { + startedBy: 'daemon', + happySessionId: 'alive', + pid: 111 + } + ] + }); + }); + + it('prunes stale sessions before stopping a session', async () => { + const pruneStaleSessions = vi.fn(() => 1); + const stopSession = vi.fn(() => true); + + const server = await startDaemonControlServer({ + getChildren: () => [], + pruneStaleSessions, + stopSession, + spawnSession, + requestShutdown: vi.fn(), + onHappySessionWebhook: vi.fn() + }); + stopServer = server.stop; + + const response = await fetch(`http://127.0.0.1:${server.port}/stop-session`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ sessionId: 'alive' }) + }); + + expect(response.ok).toBe(true); + expect(pruneStaleSessions).toHaveBeenCalledTimes(1); + expect(stopSession).toHaveBeenCalledWith('alive'); + await expect(response.json()).resolves.toEqual({ success: true }); + }); +}); diff --git a/packages/happy-cli/src/daemon/controlServer.ts b/packages/happy-cli/src/daemon/controlServer.ts index e0e05f9bec..4881e89f98 100644 --- a/packages/happy-cli/src/daemon/controlServer.ts +++ b/packages/happy-cli/src/daemon/controlServer.ts @@ -13,12 +13,14 @@ import { SpawnSessionOptions, SpawnSessionResult } from '@/modules/common/regist export function startDaemonControlServer({ getChildren, + pruneStaleSessions, stopSession, spawnSession, requestShutdown, onHappySessionWebhook }: { getChildren: () => TrackedSession[]; + pruneStaleSessions: () => number; stopSession: (sessionId: string) => boolean; spawnSession: (options: SpawnSessionOptions) => Promise; requestShutdown: () => void; @@ -70,8 +72,9 @@ export function startDaemonControlServer({ } } }, async () => { + const pruned = pruneStaleSessions(); const children = getChildren(); - logger.debug(`[CONTROL SERVER] Listing ${children.length} sessions`); + logger.debug(`[CONTROL SERVER] Listing ${children.length} sessions (pruned ${pruned})`); return { children: children .filter(child => child.happySessionId !== undefined) @@ -97,8 +100,9 @@ export function startDaemonControlServer({ } }, async (request) => { const { sessionId } = request.body; + const pruned = pruneStaleSessions(); - logger.debug(`[CONTROL SERVER] Stop session request: ${sessionId}`); + logger.debug(`[CONTROL SERVER] Stop session request: ${sessionId} (pruned ${pruned})`); const success = stopSession(sessionId); return { success }; }); @@ -208,4 +212,4 @@ export function startDaemonControlServer({ }); }); }); -} \ No newline at end of file +} diff --git a/packages/happy-cli/src/daemon/daemon.integration.test.ts b/packages/happy-cli/src/daemon/daemon.integration.test.ts index 9b566c9076..cb59698201 100644 --- a/packages/happy-cli/src/daemon/daemon.integration.test.ts +++ b/packages/happy-cli/src/daemon/daemon.integration.test.ts @@ -112,6 +112,16 @@ describe.skipIf(!await isServerHealthy())('Daemon Integration Tests', { timeout: }); it('should track session-started webhook from terminal session', async () => { + const fakeTerminalProcess = spawn(process.execPath, ['-e', 'setInterval(() => {}, 1000)'], { + detached: true, + stdio: 'ignore' + }); + fakeTerminalProcess.unref(); + + if (!fakeTerminalProcess.pid) { + throw new Error('Failed to spawn fake terminal process'); + } + // Simulate a terminal-started session reporting to daemon const mockMetadata: Metadata = { path: '/test/path', @@ -120,21 +130,61 @@ describe.skipIf(!await isServerHealthy())('Daemon Integration Tests', { timeout: happyHomeDir: '/test/happy-home', happyLibDir: '/test/happy-lib', happyToolsDir: '/test/happy-tools', - hostPid: 99999, + hostPid: fakeTerminalProcess.pid, startedBy: 'terminal', machineId: 'test-machine-123' }; - await notifyDaemonSessionStarted('test-session-123', mockMetadata); + try { + await notifyDaemonSessionStarted('test-session-123', mockMetadata); - // Verify session is tracked - const sessions = await listDaemonSessions(); - expect(sessions).toHaveLength(1); - - const tracked = sessions[0]; - expect(tracked.startedBy).toBe('happy directly - likely by user from terminal'); - expect(tracked.happySessionId).toBe('test-session-123'); - expect(tracked.pid).toBe(99999); + // Verify session is tracked + const sessions = await listDaemonSessions(); + expect(sessions).toHaveLength(1); + + const tracked = sessions[0]; + expect(tracked.startedBy).toBe('happy directly - likely by user from terminal'); + expect(tracked.happySessionId).toBe('test-session-123'); + expect(tracked.pid).toBe(fakeTerminalProcess.pid); + } finally { + try { + process.kill(fakeTerminalProcess.pid, 'SIGTERM'); + } catch { + // Process may already be gone + } + } + }); + + it('should prune dead terminal sessions immediately when listing sessions', async () => { + const fakeTerminalProcess = spawn(process.execPath, ['-e', 'setInterval(() => {}, 1000)'], { + detached: true, + stdio: 'ignore' + }); + fakeTerminalProcess.unref(); + + if (!fakeTerminalProcess.pid) { + throw new Error('Failed to spawn fake terminal process'); + } + + const mockMetadata: Metadata = { + path: '/test/path', + host: 'test-host', + homeDir: '/test/home', + happyHomeDir: '/test/happy-home', + happyLibDir: '/test/happy-lib', + happyToolsDir: '/test/happy-tools', + hostPid: fakeTerminalProcess.pid, + startedBy: 'terminal', + machineId: 'test-machine-123' + }; + + await notifyDaemonSessionStarted('dead-terminal-session', mockMetadata); + expect(await listDaemonSessions()).toHaveLength(1); + + process.kill(fakeTerminalProcess.pid, 'SIGTERM'); + await new Promise(resolve => setTimeout(resolve, 200)); + + expect(await listDaemonSessions()).toEqual([]); }); it('should spawn & stop a session via HTTP (not testing RPC route, but similar enough)', async () => { @@ -470,4 +520,4 @@ describe.skipIf(!await isServerHealthy())('Daemon Integration Tests', { timeout: // TODO: Test npm uninstall scenario - daemon should gracefully handle when happy-coder is uninstalled // Current behavior: daemon tries to spawn new daemon on version mismatch but dist/index.mjs is gone // Expected: daemon should detect missing entrypoint and either exit cleanly or at minimum not respawn infinitely -}); \ No newline at end of file +}); diff --git a/packages/happy-cli/src/daemon/run.ts b/packages/happy-cli/src/daemon/run.ts index 75889d14e9..7555762db3 100644 --- a/packages/happy-cli/src/daemon/run.ts +++ b/packages/happy-cli/src/daemon/run.ts @@ -17,6 +17,7 @@ import { writeDaemonState, DaemonLocallyPersistedState, readDaemonState, acquire import { cleanupDaemonState, isDaemonRunningCurrentlyInstalledHappyVersion, stopDaemon } from './controlClient'; import { startDaemonControlServer } from './controlServer'; +import { pruneStaleTrackedSessions } from './sessionTracking'; import { readFileSync } from 'fs'; import { join } from 'path'; import { projectPath } from '@/projectPath'; @@ -172,6 +173,13 @@ export async function startDaemon(): Promise { // Helper functions const getCurrentChildren = () => Array.from(pidToTrackedSession.values()); + const pruneStaleSessions = () => { + const removed = pruneStaleTrackedSessions(pidToTrackedSession); + if (removed > 0) { + logger.debug(`[DAEMON RUN] Pruned ${removed} stale tracked sessions`); + } + return removed; + }; // Handle webhook from happy session reporting itself const onHappySessionWebhook = (sessionId: string, sessionMetadata: Metadata) => { @@ -597,6 +605,7 @@ export async function startDaemon(): Promise { // Stop a session by sessionId or PID fallback const stopSession = (sessionId: string): boolean => { logger.debug(`[DAEMON RUN] Attempting to stop session ${sessionId}`); + pruneStaleSessions(); // Try to find by sessionId first for (const [pid, session] of pidToTrackedSession.entries()) { @@ -634,11 +643,13 @@ export async function startDaemon(): Promise { const onChildExited = (pid: number) => { logger.debug(`[DAEMON RUN] Removing exited process PID ${pid} from tracking`); pidToTrackedSession.delete(pid); + pidToAwaiter.delete(pid); }; // Start control server const { port: controlPort, stop: stopControlServer } = await startDaemonControlServer({ getChildren: getCurrentChildren, + pruneStaleSessions, stopSession, spawnSession, requestShutdown: () => requestShutdown('happy-cli'), @@ -705,17 +716,7 @@ export async function startDaemon(): Promise { logger.debug(`[DAEMON RUN] Health check started at ${new Date().toLocaleString()}`); } - // Prune stale sessions - for (const [pid, _] of pidToTrackedSession.entries()) { - try { - // Check if process is still alive (signal 0 doesn't kill, just checks) - process.kill(pid, 0); - } catch (error) { - // Process is dead, remove from tracking - logger.debug(`[DAEMON RUN] Removing stale session with PID ${pid} (process no longer exists)`); - pidToTrackedSession.delete(pid); - } - } + pruneStaleSessions(); // Check if daemon needs update // If version on disk is different from the one in package.json - we need to restart diff --git a/packages/happy-cli/src/daemon/sessionTracking.test.ts b/packages/happy-cli/src/daemon/sessionTracking.test.ts new file mode 100644 index 0000000000..15b0ba02c1 --- /dev/null +++ b/packages/happy-cli/src/daemon/sessionTracking.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { pruneStaleTrackedSessions } from './sessionTracking'; +import { TrackedSession } from './types'; + +function makeSession(pid: number, overrides: Partial = {}): TrackedSession { + return { + pid, + startedBy: 'happy directly - likely by user from terminal', + happySessionId: `session-${pid}`, + ...overrides + }; +} + +describe('pruneStaleTrackedSessions', () => { + it('removes only dead sessions and returns the number pruned', () => { + const tracked = new Map([ + [111, makeSession(111)], + [222, makeSession(222)], + [333, makeSession(333)] + ]); + + const isPidAlive = vi.fn((pid: number) => pid !== 222); + + const removed = pruneStaleTrackedSessions(tracked, isPidAlive); + + expect(removed).toBe(1); + expect(isPidAlive).toHaveBeenCalledTimes(3); + expect(Array.from(tracked.keys())).toEqual([111, 333]); + }); + + it('treats invalid pids as stale when using a custom liveness check', () => { + const tracked = new Map([ + [0, makeSession(0)], + [-1, makeSession(-1)], + [444, makeSession(444)] + ]); + + const removed = pruneStaleTrackedSessions(tracked, (pid) => pid === 444); + + expect(removed).toBe(2); + expect(Array.from(tracked.keys())).toEqual([444]); + }); +}); diff --git a/packages/happy-cli/src/daemon/sessionTracking.ts b/packages/happy-cli/src/daemon/sessionTracking.ts new file mode 100644 index 0000000000..e3c0879f73 --- /dev/null +++ b/packages/happy-cli/src/daemon/sessionTracking.ts @@ -0,0 +1,34 @@ +import { TrackedSession } from './types'; + +export type IsPidAlive = (pid: number) => boolean; + +export const defaultIsPidAlive: IsPidAlive = (pid) => { + if (!Number.isInteger(pid) || pid <= 0) { + return false; + } + + try { + process.kill(pid, 0); + return true; + } catch (error) { + if (typeof error === 'object' && error !== null && 'code' in error && error.code === 'EPERM') { + return true; + } + return false; + } +}; + +export function pruneStaleTrackedSessions( + pidToTrackedSession: Map, + isPidAlive: IsPidAlive = defaultIsPidAlive +): number { + let removed = 0; + + for (const [pid] of pidToTrackedSession.entries()) { + if (isPidAlive(pid)) continue; + pidToTrackedSession.delete(pid); + removed += 1; + } + + return removed; +} From a35f92c7b8aa45ad36d236c0aa0ad0450503009c Mon Sep 17 00:00:00 2001 From: KevinChenCodexMini Date: Tue, 10 Mar 2026 10:05:15 +0800 Subject: [PATCH 4/9] Handle stale session archive fallback --- packages/happy-app/sources/sync/apiSocket.ts | 6 +- packages/happy-app/sources/sync/ops.test.ts | 72 +++++++++++++ packages/happy-app/sources/sync/ops.ts | 40 ++++++- .../api/routes/sessionRoutes.archive.test.ts | 86 +++++++++++++++ .../sources/app/api/routes/sessionRoutes.ts | 24 ++++- .../happy-server/sources/app/api/socket.ts | 9 +- .../markDisconnectedSessionInactive.test.ts | 50 +++++++++ .../markDisconnectedSessionInactive.ts | 22 ++++ .../app/session/sessionArchive.test.ts | 102 ++++++++++++++++++ .../sources/app/session/sessionArchive.ts | 64 +++++++++++ 10 files changed, 468 insertions(+), 7 deletions(-) create mode 100644 packages/happy-app/sources/sync/ops.test.ts create mode 100644 packages/happy-server/sources/app/api/routes/sessionRoutes.archive.test.ts create mode 100644 packages/happy-server/sources/app/session/markDisconnectedSessionInactive.test.ts create mode 100644 packages/happy-server/sources/app/session/markDisconnectedSessionInactive.ts create mode 100644 packages/happy-server/sources/app/session/sessionArchive.test.ts create mode 100644 packages/happy-server/sources/app/session/sessionArchive.ts diff --git a/packages/happy-app/sources/sync/apiSocket.ts b/packages/happy-app/sources/sync/apiSocket.ts index 7e64ae5835..3fbdafaaf2 100644 --- a/packages/happy-app/sources/sync/apiSocket.ts +++ b/packages/happy-app/sources/sync/apiSocket.ts @@ -125,7 +125,7 @@ class ApiSocket { if (result.ok) { return await sessionEncryption.decryptRaw(result.result) as R; } - throw new Error('RPC call failed'); + throw new Error(result.error || 'RPC call failed'); } /** @@ -145,7 +145,7 @@ class ApiSocket { if (result.ok) { return await machineEncryption.decryptRaw(result.result) as R; } - throw new Error('RPC call failed'); + throw new Error(result.error || 'RPC call failed'); } send(event: string, data: any) { @@ -259,4 +259,4 @@ class ApiSocket { // Singleton Export // -export const apiSocket = new ApiSocket(); \ No newline at end of file +export const apiSocket = new ApiSocket(); diff --git a/packages/happy-app/sources/sync/ops.test.ts b/packages/happy-app/sources/sync/ops.test.ts new file mode 100644 index 0000000000..b5cae136bc --- /dev/null +++ b/packages/happy-app/sources/sync/ops.test.ts @@ -0,0 +1,72 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { sessionKill } from "./ops"; + +const { + sessionRPCMock, + requestMock +} = vi.hoisted(() => ({ + sessionRPCMock: vi.fn(), + requestMock: vi.fn() +})); + +vi.mock("./apiSocket", () => ({ + apiSocket: { + sessionRPC: sessionRPCMock, + request: requestMock + } +})); + +vi.mock("./sync", () => ({ + sync: {} +})); + +describe("sessionKill", () => { + beforeEach(() => { + sessionRPCMock.mockReset(); + requestMock.mockReset(); + }); + + it("returns the RPC response when the session is reachable", async () => { + sessionRPCMock.mockResolvedValue({ + success: true, + message: "Killing happy-cli process" + }); + + const result = await sessionKill("session-1"); + + expect(result).toEqual({ + success: true, + message: "Killing happy-cli process" + }); + expect(requestMock).not.toHaveBeenCalled(); + }); + + it("falls back to the server archive endpoint when the session RPC is unavailable", async () => { + sessionRPCMock.mockRejectedValue(new Error("RPC method not available")); + requestMock.mockResolvedValue({ + ok: true + }); + + const result = await sessionKill("session-1"); + + expect(result).toEqual({ + success: true, + message: "Session archived" + }); + expect(requestMock).toHaveBeenCalledWith("/v1/sessions/session-1/archive", { + method: "POST" + }); + }); + + it("returns the original error when the failure is not a stale-session case", async () => { + sessionRPCMock.mockRejectedValue(new Error("RPC call timed out")); + + const result = await sessionKill("session-1"); + + expect(result).toEqual({ + success: false, + message: "RPC call timed out" + }); + expect(requestMock).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/happy-app/sources/sync/ops.ts b/packages/happy-app/sources/sync/ops.ts index 07f70e6949..dc1dc1975e 100644 --- a/packages/happy-app/sources/sync/ops.ts +++ b/packages/happy-app/sources/sync/ops.ts @@ -126,6 +126,36 @@ interface SessionKillResponse { message: string; } +function shouldFallbackToServerArchive(message: string): boolean { + return /rpc method not available|session encryption not found|socket not connected/i.test(message); +} + +async function archiveSessionOnServer(sessionId: string, fallbackMessage: string): Promise { + try { + const response = await apiSocket.request(`/v1/sessions/${sessionId}/archive`, { + method: 'POST' + }); + + if (response.ok) { + return { + success: true, + message: 'Session archived' + }; + } + + const errorText = await response.text(); + return { + success: false, + message: errorText || fallbackMessage + }; + } catch (error) { + return { + success: false, + message: error instanceof Error ? error.message : fallbackMessage + }; + } +} + // Response types for spawn session export type SpawnSessionResult = | { type: 'success'; sessionId: string } @@ -484,9 +514,15 @@ export async function sessionKill(sessionId: string): Promise ({ + sessionArchiveMock: vi.fn() +})); + +vi.mock("@/storage/db", () => ({ + db: {} +})); + +vi.mock("@/app/events/eventRouter", () => ({ + eventRouter: {}, + buildNewSessionUpdate: vi.fn() +})); + +vi.mock("@/utils/log", () => ({ + log: vi.fn() +})); + +vi.mock("@/utils/randomKeyNaked", () => ({ + randomKeyNaked: vi.fn(() => "update-id") +})); + +vi.mock("@/storage/seq", () => ({ + allocateUserSeq: vi.fn() +})); + +vi.mock("@/app/session/sessionDelete", () => ({ + sessionDelete: vi.fn() +})); + +vi.mock("@/app/session/sessionArchive", () => ({ + sessionArchive: sessionArchiveMock +})); + +describe("sessionRoutes archive endpoint", () => { + beforeEach(() => { + sessionArchiveMock.mockReset(); + }); + + async function createApp() { + const app = fastify().withTypeProvider(); + app.setValidatorCompiler(validatorCompiler); + app.setSerializerCompiler(serializerCompiler); + app.decorate("authenticate", async (request: any) => { + request.userId = "user-1"; + }); + + sessionRoutes(app as any); + await app.ready(); + return app; + } + + it("archives a session through the HTTP route", async () => { + const app = await createApp(); + sessionArchiveMock.mockResolvedValue({ found: true, changed: true }); + + const response = await app.inject({ + method: "POST", + url: "/v1/sessions/session-1/archive" + }); + + expect(response.statusCode).toBe(200); + expect(response.json()).toEqual({ success: true }); + expect(sessionArchiveMock).toHaveBeenCalledWith(expect.objectContaining({ uid: "user-1" }), "session-1"); + + await app.close(); + }); + + it("returns 404 when the session does not exist", async () => { + const app = await createApp(); + sessionArchiveMock.mockResolvedValue({ found: false, changed: false }); + + const response = await app.inject({ + method: "POST", + url: "/v1/sessions/missing-session/archive" + }); + + expect(response.statusCode).toBe(404); + + await app.close(); + }); +}); diff --git a/packages/happy-server/sources/app/api/routes/sessionRoutes.ts b/packages/happy-server/sources/app/api/routes/sessionRoutes.ts index e4f9e7a997..f9430134a5 100644 --- a/packages/happy-server/sources/app/api/routes/sessionRoutes.ts +++ b/packages/happy-server/sources/app/api/routes/sessionRoutes.ts @@ -7,6 +7,8 @@ import { log } from "@/utils/log"; import { randomKeyNaked } from "@/utils/randomKeyNaked"; import { allocateUserSeq } from "@/storage/seq"; import { sessionDelete } from "@/app/session/sessionDelete"; +import { sessionArchive } from "@/app/session/sessionArchive"; +import { Context } from "@/context"; export function sessionRoutes(app: Fastify) { @@ -354,6 +356,26 @@ export function sessionRoutes(app: Fastify) { }); }); + // Delete session + app.post('/v1/sessions/:sessionId/archive', { + schema: { + params: z.object({ + sessionId: z.string() + }) + }, + preHandler: app.authenticate + }, async (request, reply) => { + const userId = request.userId; + const { sessionId } = request.params; + + const archived = await sessionArchive(Context.create(userId), sessionId); + if (!archived.found) { + return reply.code(404).send({ error: 'Session not found or not owned by user' }); + } + + return reply.send({ success: true }); + }); + // Delete session app.delete('/v1/sessions/:sessionId', { schema: { @@ -374,4 +396,4 @@ export function sessionRoutes(app: Fastify) { return reply.send({ success: true }); }); -} \ No newline at end of file +} diff --git a/packages/happy-server/sources/app/api/socket.ts b/packages/happy-server/sources/app/api/socket.ts index f085960b6f..e4da02599d 100644 --- a/packages/happy-server/sources/app/api/socket.ts +++ b/packages/happy-server/sources/app/api/socket.ts @@ -12,6 +12,7 @@ import { sessionUpdateHandler } from "./socket/sessionUpdateHandler"; import { machineUpdateHandler } from "./socket/machineUpdateHandler"; import { artifactUpdateHandler } from "./socket/artifactUpdateHandler"; import { accessKeyHandler } from "./socket/accessKeyHandler"; +import { markDisconnectedSessionInactive } from "@/app/session/markDisconnectedSessionInactive"; export function startSocket(app: Fastify) { const io = new Server(app.server, { @@ -120,6 +121,12 @@ export function startSocket(app: Fastify) { log({ module: 'websocket' }, `User disconnected: ${userId}`); + if (connection.connectionType === 'session-scoped') { + void markDisconnectedSessionInactive(userId, connection.sessionId, Date.now()).catch((error) => { + log({ module: 'websocket', level: 'error' }, `Failed to mark disconnected session inactive: ${error}`); + }); + } + // Broadcast daemon offline status if (connection.connectionType === 'machine-scoped') { const machineActivity = buildMachineActivityEphemeral(connection.machineId, false, Date.now()); @@ -152,4 +159,4 @@ export function startSocket(app: Fastify) { onShutdown('api', async () => { await io.close(); }); -} \ No newline at end of file +} diff --git a/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.test.ts b/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.test.ts new file mode 100644 index 0000000000..3091ae7867 --- /dev/null +++ b/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.test.ts @@ -0,0 +1,50 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { markDisconnectedSessionInactive } from "./markDisconnectedSessionInactive"; + +const { + getConnectionsMock, + sessionArchiveMock +} = vi.hoisted(() => ({ + getConnectionsMock: vi.fn(), + sessionArchiveMock: vi.fn() +})); + +vi.mock("@/app/events/eventRouter", () => ({ + eventRouter: { + getConnections: getConnectionsMock + } +})); + +vi.mock("./sessionArchive", () => ({ + sessionArchive: sessionArchiveMock +})); + +describe("markDisconnectedSessionInactive", () => { + beforeEach(() => { + getConnectionsMock.mockReset(); + sessionArchiveMock.mockReset(); + }); + + it("archives the session when no other live session-scoped connection remains", async () => { + getConnectionsMock.mockReturnValue(new Set([ + { connectionType: "user-scoped", userId: "user-1" } + ])); + sessionArchiveMock.mockResolvedValue({ found: true, changed: true }); + + const result = await markDisconnectedSessionInactive("user-1", "session-1", 1700000000000); + + expect(result).toBe(true); + expect(sessionArchiveMock).toHaveBeenCalledWith(expect.objectContaining({ uid: "user-1" }), "session-1", 1700000000000); + }); + + it("does nothing when another live connection for the same session still exists", async () => { + getConnectionsMock.mockReturnValue(new Set([ + { connectionType: "session-scoped", userId: "user-1", sessionId: "session-1" } + ])); + + const result = await markDisconnectedSessionInactive("user-1", "session-1", 1700000000000); + + expect(result).toBe(false); + expect(sessionArchiveMock).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.ts b/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.ts new file mode 100644 index 0000000000..cd0c93090c --- /dev/null +++ b/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.ts @@ -0,0 +1,22 @@ +import { eventRouter } from "@/app/events/eventRouter"; +import { Context } from "@/context"; +import { sessionArchive } from "./sessionArchive"; + +export async function markDisconnectedSessionInactive( + userId: string, + sessionId: string, + activeAt: number = Date.now() +): Promise { + const remainingConnections = eventRouter.getConnections(userId); + const hasOtherLiveSessionConnection = Array.from(remainingConnections ?? []).some((connection) => ( + connection.connectionType === "session-scoped" && + connection.sessionId === sessionId + )); + + if (hasOtherLiveSessionConnection) { + return false; + } + + const result = await sessionArchive(Context.create(userId), sessionId, activeAt); + return result.changed; +} diff --git a/packages/happy-server/sources/app/session/sessionArchive.test.ts b/packages/happy-server/sources/app/session/sessionArchive.test.ts new file mode 100644 index 0000000000..a503fe9642 --- /dev/null +++ b/packages/happy-server/sources/app/session/sessionArchive.test.ts @@ -0,0 +1,102 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { Context } from "@/context"; +import { sessionArchive } from "./sessionArchive"; + +const { + sessionFindFirstMock, + sessionUpdateManyMock, + emitEphemeralMock, + buildSessionActivityEphemeralMock +} = vi.hoisted(() => ({ + sessionFindFirstMock: vi.fn(), + sessionUpdateManyMock: vi.fn(), + emitEphemeralMock: vi.fn(), + buildSessionActivityEphemeralMock: vi.fn((sessionId: string, active: boolean, activeAt: number, thinking: boolean) => ({ + type: "activity", + id: sessionId, + active, + activeAt, + thinking + })) +})); + +vi.mock("@/storage/db", () => ({ + db: { + session: { + findFirst: sessionFindFirstMock, + updateMany: sessionUpdateManyMock + } + } +})); + +vi.mock("@/app/events/eventRouter", () => ({ + eventRouter: { + emitEphemeral: emitEphemeralMock + }, + buildSessionActivityEphemeral: buildSessionActivityEphemeralMock +})); + +vi.mock("@/utils/log", () => ({ + log: vi.fn() +})); + +describe("sessionArchive", () => { + beforeEach(() => { + sessionFindFirstMock.mockReset(); + sessionUpdateManyMock.mockReset(); + emitEphemeralMock.mockReset(); + buildSessionActivityEphemeralMock.mockClear(); + }); + + it("archives an active session and emits an offline activity update", async () => { + sessionFindFirstMock.mockResolvedValue({ id: "session-1", active: true }); + sessionUpdateManyMock.mockResolvedValue({ count: 1 }); + + const result = await sessionArchive(Context.create("user-1"), "session-1", 1700000000000); + + expect(result).toEqual({ found: true, changed: true }); + expect(sessionUpdateManyMock).toHaveBeenCalledWith({ + where: { + id: "session-1", + accountId: "user-1", + active: true + }, + data: { + active: false, + lastActiveAt: new Date(1700000000000) + } + }); + expect(buildSessionActivityEphemeralMock).toHaveBeenCalledWith("session-1", false, 1700000000000, false); + expect(emitEphemeralMock).toHaveBeenCalledWith({ + userId: "user-1", + payload: { + type: "activity", + id: "session-1", + active: false, + activeAt: 1700000000000, + thinking: false + }, + recipientFilter: { type: "user-scoped-only" } + }); + }); + + it("returns success without emitting when the session is already inactive", async () => { + sessionFindFirstMock.mockResolvedValue({ id: "session-1", active: false }); + + const result = await sessionArchive(Context.create("user-1"), "session-1", 1700000000000); + + expect(result).toEqual({ found: true, changed: false }); + expect(sessionUpdateManyMock).not.toHaveBeenCalled(); + expect(emitEphemeralMock).not.toHaveBeenCalled(); + }); + + it("returns not found for a missing session", async () => { + sessionFindFirstMock.mockResolvedValue(null); + + const result = await sessionArchive(Context.create("user-1"), "missing-session", 1700000000000); + + expect(result).toEqual({ found: false, changed: false }); + expect(sessionUpdateManyMock).not.toHaveBeenCalled(); + expect(emitEphemeralMock).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/happy-server/sources/app/session/sessionArchive.ts b/packages/happy-server/sources/app/session/sessionArchive.ts new file mode 100644 index 0000000000..c0599ca26a --- /dev/null +++ b/packages/happy-server/sources/app/session/sessionArchive.ts @@ -0,0 +1,64 @@ +import { buildSessionActivityEphemeral, eventRouter } from "@/app/events/eventRouter"; +import { Context } from "@/context"; +import { db } from "@/storage/db"; +import { log } from "@/utils/log"; + +export interface SessionArchiveResult { + found: boolean; + changed: boolean; +} + +export async function sessionArchive( + ctx: Context, + sessionId: string, + activeAt: number = Date.now() +): Promise { + const session = await db.session.findFirst({ + where: { + id: sessionId, + accountId: ctx.uid + }, + select: { + id: true, + active: true + } + }); + + if (!session) { + log({ + module: "session-archive", + userId: ctx.uid, + sessionId + }, "Session not found or not owned by user"); + return { found: false, changed: false }; + } + + if (!session.active) { + return { found: true, changed: false }; + } + + const clampedActiveAt = Math.min(activeAt, Date.now()); + const updated = await db.session.updateMany({ + where: { + id: sessionId, + accountId: ctx.uid, + active: true + }, + data: { + active: false, + lastActiveAt: new Date(clampedActiveAt) + } + }); + + if (updated.count === 0) { + return { found: true, changed: false }; + } + + eventRouter.emitEphemeral({ + userId: ctx.uid, + payload: buildSessionActivityEphemeral(sessionId, false, clampedActiveAt, false), + recipientFilter: { type: "user-scoped-only" } + }); + + return { found: true, changed: true }; +} From 770e5d44a39d31278647e6656ec62b9e7ae283bc Mon Sep 17 00:00:00 2001 From: KevinChenCodexMini Date: Tue, 10 Mar 2026 10:08:33 +0800 Subject: [PATCH 5/9] Revert "Handle stale session archive fallback" This reverts commit a35f92c7b8aa45ad36d236c0aa0ad0450503009c. --- packages/happy-app/sources/sync/apiSocket.ts | 6 +- packages/happy-app/sources/sync/ops.test.ts | 72 ------------- packages/happy-app/sources/sync/ops.ts | 40 +------ .../api/routes/sessionRoutes.archive.test.ts | 86 --------------- .../sources/app/api/routes/sessionRoutes.ts | 24 +---- .../happy-server/sources/app/api/socket.ts | 9 +- .../markDisconnectedSessionInactive.test.ts | 50 --------- .../markDisconnectedSessionInactive.ts | 22 ---- .../app/session/sessionArchive.test.ts | 102 ------------------ .../sources/app/session/sessionArchive.ts | 64 ----------- 10 files changed, 7 insertions(+), 468 deletions(-) delete mode 100644 packages/happy-app/sources/sync/ops.test.ts delete mode 100644 packages/happy-server/sources/app/api/routes/sessionRoutes.archive.test.ts delete mode 100644 packages/happy-server/sources/app/session/markDisconnectedSessionInactive.test.ts delete mode 100644 packages/happy-server/sources/app/session/markDisconnectedSessionInactive.ts delete mode 100644 packages/happy-server/sources/app/session/sessionArchive.test.ts delete mode 100644 packages/happy-server/sources/app/session/sessionArchive.ts diff --git a/packages/happy-app/sources/sync/apiSocket.ts b/packages/happy-app/sources/sync/apiSocket.ts index 3fbdafaaf2..7e64ae5835 100644 --- a/packages/happy-app/sources/sync/apiSocket.ts +++ b/packages/happy-app/sources/sync/apiSocket.ts @@ -125,7 +125,7 @@ class ApiSocket { if (result.ok) { return await sessionEncryption.decryptRaw(result.result) as R; } - throw new Error(result.error || 'RPC call failed'); + throw new Error('RPC call failed'); } /** @@ -145,7 +145,7 @@ class ApiSocket { if (result.ok) { return await machineEncryption.decryptRaw(result.result) as R; } - throw new Error(result.error || 'RPC call failed'); + throw new Error('RPC call failed'); } send(event: string, data: any) { @@ -259,4 +259,4 @@ class ApiSocket { // Singleton Export // -export const apiSocket = new ApiSocket(); +export const apiSocket = new ApiSocket(); \ No newline at end of file diff --git a/packages/happy-app/sources/sync/ops.test.ts b/packages/happy-app/sources/sync/ops.test.ts deleted file mode 100644 index b5cae136bc..0000000000 --- a/packages/happy-app/sources/sync/ops.test.ts +++ /dev/null @@ -1,72 +0,0 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import { sessionKill } from "./ops"; - -const { - sessionRPCMock, - requestMock -} = vi.hoisted(() => ({ - sessionRPCMock: vi.fn(), - requestMock: vi.fn() -})); - -vi.mock("./apiSocket", () => ({ - apiSocket: { - sessionRPC: sessionRPCMock, - request: requestMock - } -})); - -vi.mock("./sync", () => ({ - sync: {} -})); - -describe("sessionKill", () => { - beforeEach(() => { - sessionRPCMock.mockReset(); - requestMock.mockReset(); - }); - - it("returns the RPC response when the session is reachable", async () => { - sessionRPCMock.mockResolvedValue({ - success: true, - message: "Killing happy-cli process" - }); - - const result = await sessionKill("session-1"); - - expect(result).toEqual({ - success: true, - message: "Killing happy-cli process" - }); - expect(requestMock).not.toHaveBeenCalled(); - }); - - it("falls back to the server archive endpoint when the session RPC is unavailable", async () => { - sessionRPCMock.mockRejectedValue(new Error("RPC method not available")); - requestMock.mockResolvedValue({ - ok: true - }); - - const result = await sessionKill("session-1"); - - expect(result).toEqual({ - success: true, - message: "Session archived" - }); - expect(requestMock).toHaveBeenCalledWith("/v1/sessions/session-1/archive", { - method: "POST" - }); - }); - - it("returns the original error when the failure is not a stale-session case", async () => { - sessionRPCMock.mockRejectedValue(new Error("RPC call timed out")); - - const result = await sessionKill("session-1"); - - expect(result).toEqual({ - success: false, - message: "RPC call timed out" - }); - expect(requestMock).not.toHaveBeenCalled(); - }); -}); diff --git a/packages/happy-app/sources/sync/ops.ts b/packages/happy-app/sources/sync/ops.ts index dc1dc1975e..07f70e6949 100644 --- a/packages/happy-app/sources/sync/ops.ts +++ b/packages/happy-app/sources/sync/ops.ts @@ -126,36 +126,6 @@ interface SessionKillResponse { message: string; } -function shouldFallbackToServerArchive(message: string): boolean { - return /rpc method not available|session encryption not found|socket not connected/i.test(message); -} - -async function archiveSessionOnServer(sessionId: string, fallbackMessage: string): Promise { - try { - const response = await apiSocket.request(`/v1/sessions/${sessionId}/archive`, { - method: 'POST' - }); - - if (response.ok) { - return { - success: true, - message: 'Session archived' - }; - } - - const errorText = await response.text(); - return { - success: false, - message: errorText || fallbackMessage - }; - } catch (error) { - return { - success: false, - message: error instanceof Error ? error.message : fallbackMessage - }; - } -} - // Response types for spawn session export type SpawnSessionResult = | { type: 'success'; sessionId: string } @@ -514,15 +484,9 @@ export async function sessionKill(sessionId: string): Promise ({ - sessionArchiveMock: vi.fn() -})); - -vi.mock("@/storage/db", () => ({ - db: {} -})); - -vi.mock("@/app/events/eventRouter", () => ({ - eventRouter: {}, - buildNewSessionUpdate: vi.fn() -})); - -vi.mock("@/utils/log", () => ({ - log: vi.fn() -})); - -vi.mock("@/utils/randomKeyNaked", () => ({ - randomKeyNaked: vi.fn(() => "update-id") -})); - -vi.mock("@/storage/seq", () => ({ - allocateUserSeq: vi.fn() -})); - -vi.mock("@/app/session/sessionDelete", () => ({ - sessionDelete: vi.fn() -})); - -vi.mock("@/app/session/sessionArchive", () => ({ - sessionArchive: sessionArchiveMock -})); - -describe("sessionRoutes archive endpoint", () => { - beforeEach(() => { - sessionArchiveMock.mockReset(); - }); - - async function createApp() { - const app = fastify().withTypeProvider(); - app.setValidatorCompiler(validatorCompiler); - app.setSerializerCompiler(serializerCompiler); - app.decorate("authenticate", async (request: any) => { - request.userId = "user-1"; - }); - - sessionRoutes(app as any); - await app.ready(); - return app; - } - - it("archives a session through the HTTP route", async () => { - const app = await createApp(); - sessionArchiveMock.mockResolvedValue({ found: true, changed: true }); - - const response = await app.inject({ - method: "POST", - url: "/v1/sessions/session-1/archive" - }); - - expect(response.statusCode).toBe(200); - expect(response.json()).toEqual({ success: true }); - expect(sessionArchiveMock).toHaveBeenCalledWith(expect.objectContaining({ uid: "user-1" }), "session-1"); - - await app.close(); - }); - - it("returns 404 when the session does not exist", async () => { - const app = await createApp(); - sessionArchiveMock.mockResolvedValue({ found: false, changed: false }); - - const response = await app.inject({ - method: "POST", - url: "/v1/sessions/missing-session/archive" - }); - - expect(response.statusCode).toBe(404); - - await app.close(); - }); -}); diff --git a/packages/happy-server/sources/app/api/routes/sessionRoutes.ts b/packages/happy-server/sources/app/api/routes/sessionRoutes.ts index f9430134a5..e4f9e7a997 100644 --- a/packages/happy-server/sources/app/api/routes/sessionRoutes.ts +++ b/packages/happy-server/sources/app/api/routes/sessionRoutes.ts @@ -7,8 +7,6 @@ import { log } from "@/utils/log"; import { randomKeyNaked } from "@/utils/randomKeyNaked"; import { allocateUserSeq } from "@/storage/seq"; import { sessionDelete } from "@/app/session/sessionDelete"; -import { sessionArchive } from "@/app/session/sessionArchive"; -import { Context } from "@/context"; export function sessionRoutes(app: Fastify) { @@ -356,26 +354,6 @@ export function sessionRoutes(app: Fastify) { }); }); - // Delete session - app.post('/v1/sessions/:sessionId/archive', { - schema: { - params: z.object({ - sessionId: z.string() - }) - }, - preHandler: app.authenticate - }, async (request, reply) => { - const userId = request.userId; - const { sessionId } = request.params; - - const archived = await sessionArchive(Context.create(userId), sessionId); - if (!archived.found) { - return reply.code(404).send({ error: 'Session not found or not owned by user' }); - } - - return reply.send({ success: true }); - }); - // Delete session app.delete('/v1/sessions/:sessionId', { schema: { @@ -396,4 +374,4 @@ export function sessionRoutes(app: Fastify) { return reply.send({ success: true }); }); -} +} \ No newline at end of file diff --git a/packages/happy-server/sources/app/api/socket.ts b/packages/happy-server/sources/app/api/socket.ts index e4da02599d..f085960b6f 100644 --- a/packages/happy-server/sources/app/api/socket.ts +++ b/packages/happy-server/sources/app/api/socket.ts @@ -12,7 +12,6 @@ import { sessionUpdateHandler } from "./socket/sessionUpdateHandler"; import { machineUpdateHandler } from "./socket/machineUpdateHandler"; import { artifactUpdateHandler } from "./socket/artifactUpdateHandler"; import { accessKeyHandler } from "./socket/accessKeyHandler"; -import { markDisconnectedSessionInactive } from "@/app/session/markDisconnectedSessionInactive"; export function startSocket(app: Fastify) { const io = new Server(app.server, { @@ -121,12 +120,6 @@ export function startSocket(app: Fastify) { log({ module: 'websocket' }, `User disconnected: ${userId}`); - if (connection.connectionType === 'session-scoped') { - void markDisconnectedSessionInactive(userId, connection.sessionId, Date.now()).catch((error) => { - log({ module: 'websocket', level: 'error' }, `Failed to mark disconnected session inactive: ${error}`); - }); - } - // Broadcast daemon offline status if (connection.connectionType === 'machine-scoped') { const machineActivity = buildMachineActivityEphemeral(connection.machineId, false, Date.now()); @@ -159,4 +152,4 @@ export function startSocket(app: Fastify) { onShutdown('api', async () => { await io.close(); }); -} +} \ No newline at end of file diff --git a/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.test.ts b/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.test.ts deleted file mode 100644 index 3091ae7867..0000000000 --- a/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.test.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import { markDisconnectedSessionInactive } from "./markDisconnectedSessionInactive"; - -const { - getConnectionsMock, - sessionArchiveMock -} = vi.hoisted(() => ({ - getConnectionsMock: vi.fn(), - sessionArchiveMock: vi.fn() -})); - -vi.mock("@/app/events/eventRouter", () => ({ - eventRouter: { - getConnections: getConnectionsMock - } -})); - -vi.mock("./sessionArchive", () => ({ - sessionArchive: sessionArchiveMock -})); - -describe("markDisconnectedSessionInactive", () => { - beforeEach(() => { - getConnectionsMock.mockReset(); - sessionArchiveMock.mockReset(); - }); - - it("archives the session when no other live session-scoped connection remains", async () => { - getConnectionsMock.mockReturnValue(new Set([ - { connectionType: "user-scoped", userId: "user-1" } - ])); - sessionArchiveMock.mockResolvedValue({ found: true, changed: true }); - - const result = await markDisconnectedSessionInactive("user-1", "session-1", 1700000000000); - - expect(result).toBe(true); - expect(sessionArchiveMock).toHaveBeenCalledWith(expect.objectContaining({ uid: "user-1" }), "session-1", 1700000000000); - }); - - it("does nothing when another live connection for the same session still exists", async () => { - getConnectionsMock.mockReturnValue(new Set([ - { connectionType: "session-scoped", userId: "user-1", sessionId: "session-1" } - ])); - - const result = await markDisconnectedSessionInactive("user-1", "session-1", 1700000000000); - - expect(result).toBe(false); - expect(sessionArchiveMock).not.toHaveBeenCalled(); - }); -}); diff --git a/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.ts b/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.ts deleted file mode 100644 index cd0c93090c..0000000000 --- a/packages/happy-server/sources/app/session/markDisconnectedSessionInactive.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { eventRouter } from "@/app/events/eventRouter"; -import { Context } from "@/context"; -import { sessionArchive } from "./sessionArchive"; - -export async function markDisconnectedSessionInactive( - userId: string, - sessionId: string, - activeAt: number = Date.now() -): Promise { - const remainingConnections = eventRouter.getConnections(userId); - const hasOtherLiveSessionConnection = Array.from(remainingConnections ?? []).some((connection) => ( - connection.connectionType === "session-scoped" && - connection.sessionId === sessionId - )); - - if (hasOtherLiveSessionConnection) { - return false; - } - - const result = await sessionArchive(Context.create(userId), sessionId, activeAt); - return result.changed; -} diff --git a/packages/happy-server/sources/app/session/sessionArchive.test.ts b/packages/happy-server/sources/app/session/sessionArchive.test.ts deleted file mode 100644 index a503fe9642..0000000000 --- a/packages/happy-server/sources/app/session/sessionArchive.test.ts +++ /dev/null @@ -1,102 +0,0 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import { Context } from "@/context"; -import { sessionArchive } from "./sessionArchive"; - -const { - sessionFindFirstMock, - sessionUpdateManyMock, - emitEphemeralMock, - buildSessionActivityEphemeralMock -} = vi.hoisted(() => ({ - sessionFindFirstMock: vi.fn(), - sessionUpdateManyMock: vi.fn(), - emitEphemeralMock: vi.fn(), - buildSessionActivityEphemeralMock: vi.fn((sessionId: string, active: boolean, activeAt: number, thinking: boolean) => ({ - type: "activity", - id: sessionId, - active, - activeAt, - thinking - })) -})); - -vi.mock("@/storage/db", () => ({ - db: { - session: { - findFirst: sessionFindFirstMock, - updateMany: sessionUpdateManyMock - } - } -})); - -vi.mock("@/app/events/eventRouter", () => ({ - eventRouter: { - emitEphemeral: emitEphemeralMock - }, - buildSessionActivityEphemeral: buildSessionActivityEphemeralMock -})); - -vi.mock("@/utils/log", () => ({ - log: vi.fn() -})); - -describe("sessionArchive", () => { - beforeEach(() => { - sessionFindFirstMock.mockReset(); - sessionUpdateManyMock.mockReset(); - emitEphemeralMock.mockReset(); - buildSessionActivityEphemeralMock.mockClear(); - }); - - it("archives an active session and emits an offline activity update", async () => { - sessionFindFirstMock.mockResolvedValue({ id: "session-1", active: true }); - sessionUpdateManyMock.mockResolvedValue({ count: 1 }); - - const result = await sessionArchive(Context.create("user-1"), "session-1", 1700000000000); - - expect(result).toEqual({ found: true, changed: true }); - expect(sessionUpdateManyMock).toHaveBeenCalledWith({ - where: { - id: "session-1", - accountId: "user-1", - active: true - }, - data: { - active: false, - lastActiveAt: new Date(1700000000000) - } - }); - expect(buildSessionActivityEphemeralMock).toHaveBeenCalledWith("session-1", false, 1700000000000, false); - expect(emitEphemeralMock).toHaveBeenCalledWith({ - userId: "user-1", - payload: { - type: "activity", - id: "session-1", - active: false, - activeAt: 1700000000000, - thinking: false - }, - recipientFilter: { type: "user-scoped-only" } - }); - }); - - it("returns success without emitting when the session is already inactive", async () => { - sessionFindFirstMock.mockResolvedValue({ id: "session-1", active: false }); - - const result = await sessionArchive(Context.create("user-1"), "session-1", 1700000000000); - - expect(result).toEqual({ found: true, changed: false }); - expect(sessionUpdateManyMock).not.toHaveBeenCalled(); - expect(emitEphemeralMock).not.toHaveBeenCalled(); - }); - - it("returns not found for a missing session", async () => { - sessionFindFirstMock.mockResolvedValue(null); - - const result = await sessionArchive(Context.create("user-1"), "missing-session", 1700000000000); - - expect(result).toEqual({ found: false, changed: false }); - expect(sessionUpdateManyMock).not.toHaveBeenCalled(); - expect(emitEphemeralMock).not.toHaveBeenCalled(); - }); -}); diff --git a/packages/happy-server/sources/app/session/sessionArchive.ts b/packages/happy-server/sources/app/session/sessionArchive.ts deleted file mode 100644 index c0599ca26a..0000000000 --- a/packages/happy-server/sources/app/session/sessionArchive.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { buildSessionActivityEphemeral, eventRouter } from "@/app/events/eventRouter"; -import { Context } from "@/context"; -import { db } from "@/storage/db"; -import { log } from "@/utils/log"; - -export interface SessionArchiveResult { - found: boolean; - changed: boolean; -} - -export async function sessionArchive( - ctx: Context, - sessionId: string, - activeAt: number = Date.now() -): Promise { - const session = await db.session.findFirst({ - where: { - id: sessionId, - accountId: ctx.uid - }, - select: { - id: true, - active: true - } - }); - - if (!session) { - log({ - module: "session-archive", - userId: ctx.uid, - sessionId - }, "Session not found or not owned by user"); - return { found: false, changed: false }; - } - - if (!session.active) { - return { found: true, changed: false }; - } - - const clampedActiveAt = Math.min(activeAt, Date.now()); - const updated = await db.session.updateMany({ - where: { - id: sessionId, - accountId: ctx.uid, - active: true - }, - data: { - active: false, - lastActiveAt: new Date(clampedActiveAt) - } - }); - - if (updated.count === 0) { - return { found: true, changed: false }; - } - - eventRouter.emitEphemeral({ - userId: ctx.uid, - payload: buildSessionActivityEphemeral(sessionId, false, clampedActiveAt, false), - recipientFilter: { type: "user-scoped-only" } - }); - - return { found: true, changed: true }; -} From 16b183ad867f9dd68d1367e818a0c9860bcd0bb5 Mon Sep 17 00:00:00 2001 From: KevinChenCodexMini Date: Tue, 10 Mar 2026 10:15:09 +0800 Subject: [PATCH 6/9] Revert "Prune stale daemon sessions on demand" This reverts commit 73121d5c3f6c6009d2d6826a58374b675f11682b. --- .../src/daemon/controlServer.test.ts | 86 ------------------- .../happy-cli/src/daemon/controlServer.ts | 10 +-- .../src/daemon/daemon.integration.test.ts | 72 +++------------- packages/happy-cli/src/daemon/run.ts | 23 +++-- .../src/daemon/sessionTracking.test.ts | 44 ---------- .../happy-cli/src/daemon/sessionTracking.ts | 34 -------- 6 files changed, 25 insertions(+), 244 deletions(-) delete mode 100644 packages/happy-cli/src/daemon/controlServer.test.ts delete mode 100644 packages/happy-cli/src/daemon/sessionTracking.test.ts delete mode 100644 packages/happy-cli/src/daemon/sessionTracking.ts diff --git a/packages/happy-cli/src/daemon/controlServer.test.ts b/packages/happy-cli/src/daemon/controlServer.test.ts deleted file mode 100644 index 8f1670ac26..0000000000 --- a/packages/happy-cli/src/daemon/controlServer.test.ts +++ /dev/null @@ -1,86 +0,0 @@ -import { afterEach, describe, expect, it, vi } from 'vitest'; - -import { startDaemonControlServer } from './controlServer'; -import { TrackedSession } from './types'; -import { SpawnSessionResult } from '@/modules/common/registerCommonHandlers'; - -describe('startDaemonControlServer', () => { - let stopServer: (() => Promise) | null = null; - const spawnSession = vi.fn(async (): Promise => ({ - type: 'success', - sessionId: 'alive' - })); - - afterEach(async () => { - if (stopServer) { - await stopServer(); - stopServer = null; - } - spawnSession.mockClear(); - }); - - it('prunes stale sessions before listing children', async () => { - const children: TrackedSession[] = [ - { startedBy: 'daemon', happySessionId: 'alive', pid: 111 }, - { startedBy: 'daemon', pid: 222 } - ]; - const pruneStaleSessions = vi.fn(() => { - children.splice(1, 1); - return 1; - }); - - const server = await startDaemonControlServer({ - getChildren: () => children, - pruneStaleSessions, - stopSession: vi.fn(() => true), - spawnSession, - requestShutdown: vi.fn(), - onHappySessionWebhook: vi.fn() - }); - stopServer = server.stop; - - const response = await fetch(`http://127.0.0.1:${server.port}/list`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: '{}' - }); - - expect(response.ok).toBe(true); - expect(pruneStaleSessions).toHaveBeenCalledTimes(1); - await expect(response.json()).resolves.toEqual({ - children: [ - { - startedBy: 'daemon', - happySessionId: 'alive', - pid: 111 - } - ] - }); - }); - - it('prunes stale sessions before stopping a session', async () => { - const pruneStaleSessions = vi.fn(() => 1); - const stopSession = vi.fn(() => true); - - const server = await startDaemonControlServer({ - getChildren: () => [], - pruneStaleSessions, - stopSession, - spawnSession, - requestShutdown: vi.fn(), - onHappySessionWebhook: vi.fn() - }); - stopServer = server.stop; - - const response = await fetch(`http://127.0.0.1:${server.port}/stop-session`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ sessionId: 'alive' }) - }); - - expect(response.ok).toBe(true); - expect(pruneStaleSessions).toHaveBeenCalledTimes(1); - expect(stopSession).toHaveBeenCalledWith('alive'); - await expect(response.json()).resolves.toEqual({ success: true }); - }); -}); diff --git a/packages/happy-cli/src/daemon/controlServer.ts b/packages/happy-cli/src/daemon/controlServer.ts index 4881e89f98..e0e05f9bec 100644 --- a/packages/happy-cli/src/daemon/controlServer.ts +++ b/packages/happy-cli/src/daemon/controlServer.ts @@ -13,14 +13,12 @@ import { SpawnSessionOptions, SpawnSessionResult } from '@/modules/common/regist export function startDaemonControlServer({ getChildren, - pruneStaleSessions, stopSession, spawnSession, requestShutdown, onHappySessionWebhook }: { getChildren: () => TrackedSession[]; - pruneStaleSessions: () => number; stopSession: (sessionId: string) => boolean; spawnSession: (options: SpawnSessionOptions) => Promise; requestShutdown: () => void; @@ -72,9 +70,8 @@ export function startDaemonControlServer({ } } }, async () => { - const pruned = pruneStaleSessions(); const children = getChildren(); - logger.debug(`[CONTROL SERVER] Listing ${children.length} sessions (pruned ${pruned})`); + logger.debug(`[CONTROL SERVER] Listing ${children.length} sessions`); return { children: children .filter(child => child.happySessionId !== undefined) @@ -100,9 +97,8 @@ export function startDaemonControlServer({ } }, async (request) => { const { sessionId } = request.body; - const pruned = pruneStaleSessions(); - logger.debug(`[CONTROL SERVER] Stop session request: ${sessionId} (pruned ${pruned})`); + logger.debug(`[CONTROL SERVER] Stop session request: ${sessionId}`); const success = stopSession(sessionId); return { success }; }); @@ -212,4 +208,4 @@ export function startDaemonControlServer({ }); }); }); -} +} \ No newline at end of file diff --git a/packages/happy-cli/src/daemon/daemon.integration.test.ts b/packages/happy-cli/src/daemon/daemon.integration.test.ts index cb59698201..9b566c9076 100644 --- a/packages/happy-cli/src/daemon/daemon.integration.test.ts +++ b/packages/happy-cli/src/daemon/daemon.integration.test.ts @@ -112,16 +112,6 @@ describe.skipIf(!await isServerHealthy())('Daemon Integration Tests', { timeout: }); it('should track session-started webhook from terminal session', async () => { - const fakeTerminalProcess = spawn(process.execPath, ['-e', 'setInterval(() => {}, 1000)'], { - detached: true, - stdio: 'ignore' - }); - fakeTerminalProcess.unref(); - - if (!fakeTerminalProcess.pid) { - throw new Error('Failed to spawn fake terminal process'); - } - // Simulate a terminal-started session reporting to daemon const mockMetadata: Metadata = { path: '/test/path', @@ -130,61 +120,21 @@ describe.skipIf(!await isServerHealthy())('Daemon Integration Tests', { timeout: happyHomeDir: '/test/happy-home', happyLibDir: '/test/happy-lib', happyToolsDir: '/test/happy-tools', - hostPid: fakeTerminalProcess.pid, - startedBy: 'terminal', - machineId: 'test-machine-123' - }; - - try { - await notifyDaemonSessionStarted('test-session-123', mockMetadata); - - // Verify session is tracked - const sessions = await listDaemonSessions(); - expect(sessions).toHaveLength(1); - - const tracked = sessions[0]; - expect(tracked.startedBy).toBe('happy directly - likely by user from terminal'); - expect(tracked.happySessionId).toBe('test-session-123'); - expect(tracked.pid).toBe(fakeTerminalProcess.pid); - } finally { - try { - process.kill(fakeTerminalProcess.pid, 'SIGTERM'); - } catch { - // Process may already be gone - } - } - }); - - it('should prune dead terminal sessions immediately when listing sessions', async () => { - const fakeTerminalProcess = spawn(process.execPath, ['-e', 'setInterval(() => {}, 1000)'], { - detached: true, - stdio: 'ignore' - }); - fakeTerminalProcess.unref(); - - if (!fakeTerminalProcess.pid) { - throw new Error('Failed to spawn fake terminal process'); - } - - const mockMetadata: Metadata = { - path: '/test/path', - host: 'test-host', - homeDir: '/test/home', - happyHomeDir: '/test/happy-home', - happyLibDir: '/test/happy-lib', - happyToolsDir: '/test/happy-tools', - hostPid: fakeTerminalProcess.pid, + hostPid: 99999, startedBy: 'terminal', machineId: 'test-machine-123' }; - await notifyDaemonSessionStarted('dead-terminal-session', mockMetadata); - expect(await listDaemonSessions()).toHaveLength(1); + await notifyDaemonSessionStarted('test-session-123', mockMetadata); - process.kill(fakeTerminalProcess.pid, 'SIGTERM'); - await new Promise(resolve => setTimeout(resolve, 200)); - - expect(await listDaemonSessions()).toEqual([]); + // Verify session is tracked + const sessions = await listDaemonSessions(); + expect(sessions).toHaveLength(1); + + const tracked = sessions[0]; + expect(tracked.startedBy).toBe('happy directly - likely by user from terminal'); + expect(tracked.happySessionId).toBe('test-session-123'); + expect(tracked.pid).toBe(99999); }); it('should spawn & stop a session via HTTP (not testing RPC route, but similar enough)', async () => { @@ -520,4 +470,4 @@ describe.skipIf(!await isServerHealthy())('Daemon Integration Tests', { timeout: // TODO: Test npm uninstall scenario - daemon should gracefully handle when happy-coder is uninstalled // Current behavior: daemon tries to spawn new daemon on version mismatch but dist/index.mjs is gone // Expected: daemon should detect missing entrypoint and either exit cleanly or at minimum not respawn infinitely -}); +}); \ No newline at end of file diff --git a/packages/happy-cli/src/daemon/run.ts b/packages/happy-cli/src/daemon/run.ts index 7555762db3..75889d14e9 100644 --- a/packages/happy-cli/src/daemon/run.ts +++ b/packages/happy-cli/src/daemon/run.ts @@ -17,7 +17,6 @@ import { writeDaemonState, DaemonLocallyPersistedState, readDaemonState, acquire import { cleanupDaemonState, isDaemonRunningCurrentlyInstalledHappyVersion, stopDaemon } from './controlClient'; import { startDaemonControlServer } from './controlServer'; -import { pruneStaleTrackedSessions } from './sessionTracking'; import { readFileSync } from 'fs'; import { join } from 'path'; import { projectPath } from '@/projectPath'; @@ -173,13 +172,6 @@ export async function startDaemon(): Promise { // Helper functions const getCurrentChildren = () => Array.from(pidToTrackedSession.values()); - const pruneStaleSessions = () => { - const removed = pruneStaleTrackedSessions(pidToTrackedSession); - if (removed > 0) { - logger.debug(`[DAEMON RUN] Pruned ${removed} stale tracked sessions`); - } - return removed; - }; // Handle webhook from happy session reporting itself const onHappySessionWebhook = (sessionId: string, sessionMetadata: Metadata) => { @@ -605,7 +597,6 @@ export async function startDaemon(): Promise { // Stop a session by sessionId or PID fallback const stopSession = (sessionId: string): boolean => { logger.debug(`[DAEMON RUN] Attempting to stop session ${sessionId}`); - pruneStaleSessions(); // Try to find by sessionId first for (const [pid, session] of pidToTrackedSession.entries()) { @@ -643,13 +634,11 @@ export async function startDaemon(): Promise { const onChildExited = (pid: number) => { logger.debug(`[DAEMON RUN] Removing exited process PID ${pid} from tracking`); pidToTrackedSession.delete(pid); - pidToAwaiter.delete(pid); }; // Start control server const { port: controlPort, stop: stopControlServer } = await startDaemonControlServer({ getChildren: getCurrentChildren, - pruneStaleSessions, stopSession, spawnSession, requestShutdown: () => requestShutdown('happy-cli'), @@ -716,7 +705,17 @@ export async function startDaemon(): Promise { logger.debug(`[DAEMON RUN] Health check started at ${new Date().toLocaleString()}`); } - pruneStaleSessions(); + // Prune stale sessions + for (const [pid, _] of pidToTrackedSession.entries()) { + try { + // Check if process is still alive (signal 0 doesn't kill, just checks) + process.kill(pid, 0); + } catch (error) { + // Process is dead, remove from tracking + logger.debug(`[DAEMON RUN] Removing stale session with PID ${pid} (process no longer exists)`); + pidToTrackedSession.delete(pid); + } + } // Check if daemon needs update // If version on disk is different from the one in package.json - we need to restart diff --git a/packages/happy-cli/src/daemon/sessionTracking.test.ts b/packages/happy-cli/src/daemon/sessionTracking.test.ts deleted file mode 100644 index 15b0ba02c1..0000000000 --- a/packages/happy-cli/src/daemon/sessionTracking.test.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { describe, expect, it, vi } from 'vitest'; - -import { pruneStaleTrackedSessions } from './sessionTracking'; -import { TrackedSession } from './types'; - -function makeSession(pid: number, overrides: Partial = {}): TrackedSession { - return { - pid, - startedBy: 'happy directly - likely by user from terminal', - happySessionId: `session-${pid}`, - ...overrides - }; -} - -describe('pruneStaleTrackedSessions', () => { - it('removes only dead sessions and returns the number pruned', () => { - const tracked = new Map([ - [111, makeSession(111)], - [222, makeSession(222)], - [333, makeSession(333)] - ]); - - const isPidAlive = vi.fn((pid: number) => pid !== 222); - - const removed = pruneStaleTrackedSessions(tracked, isPidAlive); - - expect(removed).toBe(1); - expect(isPidAlive).toHaveBeenCalledTimes(3); - expect(Array.from(tracked.keys())).toEqual([111, 333]); - }); - - it('treats invalid pids as stale when using a custom liveness check', () => { - const tracked = new Map([ - [0, makeSession(0)], - [-1, makeSession(-1)], - [444, makeSession(444)] - ]); - - const removed = pruneStaleTrackedSessions(tracked, (pid) => pid === 444); - - expect(removed).toBe(2); - expect(Array.from(tracked.keys())).toEqual([444]); - }); -}); diff --git a/packages/happy-cli/src/daemon/sessionTracking.ts b/packages/happy-cli/src/daemon/sessionTracking.ts deleted file mode 100644 index e3c0879f73..0000000000 --- a/packages/happy-cli/src/daemon/sessionTracking.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { TrackedSession } from './types'; - -export type IsPidAlive = (pid: number) => boolean; - -export const defaultIsPidAlive: IsPidAlive = (pid) => { - if (!Number.isInteger(pid) || pid <= 0) { - return false; - } - - try { - process.kill(pid, 0); - return true; - } catch (error) { - if (typeof error === 'object' && error !== null && 'code' in error && error.code === 'EPERM') { - return true; - } - return false; - } -}; - -export function pruneStaleTrackedSessions( - pidToTrackedSession: Map, - isPidAlive: IsPidAlive = defaultIsPidAlive -): number { - let removed = 0; - - for (const [pid] of pidToTrackedSession.entries()) { - if (isPidAlive(pid)) continue; - pidToTrackedSession.delete(pid); - removed += 1; - } - - return removed; -} From 940caef577dfc170f149d38dfa97d5343f1497d9 Mon Sep 17 00:00:00 2001 From: KevinChenCodexMini Date: Tue, 10 Mar 2026 10:20:29 +0800 Subject: [PATCH 7/9] Attach Happy UI for codex resume --- packages/happy-cli/src/index.ts | 38 +++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/packages/happy-cli/src/index.ts b/packages/happy-cli/src/index.ts index cebdece2b1..d9c56fa48a 100644 --- a/packages/happy-cli/src/index.ts +++ b/packages/happy-cli/src/index.ts @@ -748,12 +748,14 @@ ${chalk.bold('Options:')} --session-tag Use known session tag directly --home-dir Override metadata.homeDir --happy-home-dir Override metadata.happyHomeDir + --detach Launch restored session in background instead of attaching UI --dry-run Resolve parameters and print without launching -h, --help Show help ${chalk.bold('Examples:')} happy codex resume cmmexample --metadata-file ./metadata.json happy codex resume cmmexample --path /Users/me/project --pid 12345 + happy codex resume cmmexample --metadata-file ./metadata.json --detach `); } @@ -822,6 +824,7 @@ function parseCodexResumeArgs(args: string[]) { let sessionTag: string | null = null let homeDir: string | null = null let happyHomeDir: string | null = null + let detach = false let dryRun = false let showHelp = false @@ -841,6 +844,8 @@ function parseCodexResumeArgs(args: string[]) { homeDir = args[++i] } else if (arg === '--happy-home-dir') { happyHomeDir = args[++i] + } else if (arg === '--detach') { + detach = true } else if (arg === '--dry-run') { dryRun = true } else if (!sessionId) { @@ -858,6 +863,7 @@ function parseCodexResumeArgs(args: string[]) { sessionTag, homeDir, happyHomeDir, + detach, dryRun, showHelp } @@ -935,6 +941,7 @@ async function resolveCodexResumeParameters(args: string[]) { sessionSnapshotPath, tagSnapshotPath, metadata, + detach: parsed.detach, dryRun: parsed.dryRun } } @@ -966,6 +973,7 @@ async function handleCodexResumeCommand(args: string[]): Promise { console.log(` Workdir: ${resolved.workdir}`) console.log(` Home dir: ${resolved.homeDir}`) console.log(` Happy home: ${resolved.happyHomeDir}`) + console.log(` Launch mode: ${resolved.detach ? 'detached' : 'attached'}`) console.log(` Prior log: ${resolved.logPath || 'not found'}`) console.log(` Session snapshot: ${resolved.sessionSnapshotPath}`) console.log(` Tag snapshot: ${resolved.tagSnapshotPath}`) @@ -985,6 +993,36 @@ async function handleCodexResumeCommand(args: string[]): Promise { } catch { } + if (!resolved.detach) { + console.log('Starting restored Happy Codex session in attached mode...') + const child = spawnHappyCLI( + ['codex', '--happy-starting-mode', 'remote', '--started-by', 'daemon'], + { + cwd: resolved.workdir, + stdio: 'inherit', + env + } + ) + + const exitCode = await new Promise((resolve, reject) => { + child.once('error', reject) + child.once('exit', (code, signal) => { + if (typeof code === 'number') { + resolve(code) + return + } + if (signal) { + console.error(`Restored Happy Codex exited with signal ${signal}`) + resolve(1) + return + } + resolve(0) + }) + }) + + process.exit(exitCode) + } + const child = spawnHappyCLI( ['codex', '--happy-starting-mode', 'remote', '--started-by', 'daemon'], { From 5ba0cfef24382be35ff9848e8b97d307c1f21b94 Mon Sep 17 00:00:00 2001 From: KevinChenCodexMini Date: Tue, 10 Mar 2026 12:02:55 +0800 Subject: [PATCH 8/9] Fix codex resume message cursor replay --- packages/happy-cli/src/api/apiSession.test.ts | 28 ++++++++++++++++++- packages/happy-cli/src/api/apiSession.ts | 3 +- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/packages/happy-cli/src/api/apiSession.test.ts b/packages/happy-cli/src/api/apiSession.test.ts index 9122558495..d74993023a 100644 --- a/packages/happy-cli/src/api/apiSession.test.ts +++ b/packages/happy-cli/src/api/apiSession.test.ts @@ -71,7 +71,14 @@ vi.mock('@/utils/time', () => ({ type SocketHandler = (...args: any[]) => void; type SocketHandlers = Record; -function makeSession() { +function makeSession(overrides: Partial> = {}) { + return { + ...makeSessionBase(), + ...overrides + }; +} + +function makeSessionBase() { return { id: 'test-session-id', seq: 0, @@ -859,6 +866,25 @@ describe('ApiSessionClient v3 messages API migration', () => { expect(mockAxiosGet.mock.calls[0][1].params.after_seq).toBe(0); }); + it('starts receive cursor from the existing session seq on reconnect', async () => { + const resumedSession = makeSession({ seq: 23 }); + new ApiSessionClient('fake-token', resumedSession); + + mockAxiosGet.mockResolvedValueOnce({ + data: { + messages: [], + hasMore: false + } + }); + + emitSocketEvent('connect'); + + await waitForCheck(() => { + expect(mockAxiosGet).toHaveBeenCalledTimes(1); + }); + expect(mockAxiosGet.mock.calls[0][1].params.after_seq).toBe(23); + }); + it('stops send and receive sync loops on close', async () => { const client = new ApiSessionClient('fake-token', session); await client.close(); diff --git a/packages/happy-cli/src/api/apiSession.ts b/packages/happy-cli/src/api/apiSession.ts index f8dbf61cdb..a48c565f36 100644 --- a/packages/happy-cli/src/api/apiSession.ts +++ b/packages/happy-cli/src/api/apiSession.ts @@ -97,7 +97,7 @@ export class ApiSessionClient extends EventEmitter { startedSubagents: new Set(), activeSubagents: new Set(), }; - private lastSeq = 0; + private lastSeq: number; private pendingOutbox: Array<{ content: string; localId: string }> = []; private readonly sendSync: InvalidateSync; private readonly receiveSync: InvalidateSync; @@ -112,6 +112,7 @@ export class ApiSessionClient extends EventEmitter { this.agentStateVersion = session.agentStateVersion; this.encryptionKey = session.encryptionKey; this.encryptionVariant = session.encryptionVariant; + this.lastSeq = Math.max(0, session.seq); this.sendSync = new InvalidateSync(() => this.flushOutbox()); this.receiveSync = new InvalidateSync(() => this.fetchMessages()); From 67a1c933fa31373552ffc642a40fd1bfbc425389 Mon Sep 17 00:00:00 2001 From: KevinChenCodexMini Date: Tue, 10 Mar 2026 13:58:33 +0800 Subject: [PATCH 9/9] Prevent resume replaying old Codex prompts --- .../__tests__/resumeReplayFilter.test.ts | 25 ++++ .../src/codex/codexAppServerClient.ts | 10 ++ packages/happy-cli/src/codex/runCodex.ts | 135 +++++++++++++++--- 3 files changed, 147 insertions(+), 23 deletions(-) create mode 100644 packages/happy-cli/src/codex/__tests__/resumeReplayFilter.test.ts diff --git a/packages/happy-cli/src/codex/__tests__/resumeReplayFilter.test.ts b/packages/happy-cli/src/codex/__tests__/resumeReplayFilter.test.ts new file mode 100644 index 0000000000..04f61ebe11 --- /dev/null +++ b/packages/happy-cli/src/codex/__tests__/resumeReplayFilter.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from 'vitest'; +import { filterBufferedResumeMessages, normalizeResumeUserText } from '../runCodex'; + +describe('resume replay filtering', () => { + it('normalizes resume user text', () => { + expect(normalizeResumeUserText(' hello\r\nworld ')).toBe('hello\nworld'); + }); + + it('drops buffered messages already present in the saved transcript', () => { + const recentResumeUserTexts = new Set([ + normalizeResumeUserText('记忆测试: 大象在冰箱里\n只回复 ACK1'), + normalizeResumeUserText('测试: 大象在哪里?\n只回复 ACK2:冰箱里'), + ]); + + const buffered = [ + { text: ' 记忆测试: 大象在冰箱里\r\n只回复 ACK1 ', mode: 'same' }, + { text: '测试: 大象在哪里?\n只回复 ACK2:冰箱里', mode: 'same' }, + { text: '测试: 小兔子在哪里?\n只回复 NEW', mode: 'same' }, + ]; + + expect(filterBufferedResumeMessages(buffered, recentResumeUserTexts)).toEqual([ + { text: '测试: 小兔子在哪里?\n只回复 NEW', mode: 'same' }, + ]); + }); +}); diff --git a/packages/happy-cli/src/codex/codexAppServerClient.ts b/packages/happy-cli/src/codex/codexAppServerClient.ts index 9468e81465..575b6bda0c 100644 --- a/packages/happy-cli/src/codex/codexAppServerClient.ts +++ b/packages/happy-cli/src/codex/codexAppServerClient.ts @@ -156,6 +156,16 @@ export class CodexAppServerClient { return this.sessionId; } + async resumeSavedThread( + config?: Partial, + options?: { timeout?: number; signal?: AbortSignal } + ): Promise { + if (!this.connected) { + await this.connect(); + } + await this.ensureThreadResumed(config, options); + } + async forceCloseSession(): Promise { logger.debug('[CodexAppServer] Force closing session'); try { diff --git a/packages/happy-cli/src/codex/runCodex.ts b/packages/happy-cli/src/codex/runCodex.ts index b2d119665e..1e8e7cd049 100644 --- a/packages/happy-cli/src/codex/runCodex.ts +++ b/packages/happy-cli/src/codex/runCodex.ts @@ -64,6 +64,19 @@ export function emitReadyIfIdle({ pending, queueSize, shouldExit, sendReady, not return true; } +export function normalizeResumeUserText(text: string): string { + return text.replace(/\r\n/g, '\n').trim(); +} + +export function filterBufferedResumeMessages( + messages: T[], + recentResumeUserTexts: Set, +): T[] { + return messages.filter(({ text }) => ( + !recentResumeUserTexts.has(normalizeResumeUserText(text)) + )); +} + /** * Main entry point for the codex command with ink UI */ @@ -327,6 +340,30 @@ export async function runCodex(opts: { // Use shared PermissionMode type from api/types for cross-agent compatibility let currentPermissionMode: import('@/api/types').PermissionMode | undefined = undefined; let currentModel: string | undefined = undefined; + let resumeBootstrapPending = false; + let recentResumeUserTexts = new Set(); + const bufferedResumeMessages: Array<{ text: string; mode: EnhancedMode }> = []; + + function flushBufferedResumeMessages() { + if (!bufferedResumeMessages.length) { + return; + } + + const pendingMessages = bufferedResumeMessages.splice(0); + const filteredMessages = filterBufferedResumeMessages(pendingMessages, recentResumeUserTexts); + const droppedCount = pendingMessages.length - filteredMessages.length; + + if (droppedCount > 0) { + logger.debug('[Codex] Dropped duplicate pre-resume user messages', { + droppedCount, + keptCount: filteredMessages.length + }); + } + + for (const { text, mode } of filteredMessages) { + messageQueue.push(text, mode); + } + } session.onUserMessage((message) => { // Resolve permission mode (accept all modes, will be mapped in switch statement) @@ -353,6 +390,14 @@ export async function runCodex(opts: { permissionMode: messagePermissionMode || 'default', model: messageModel, }; + if (resumeBootstrapPending) { + logger.debug('[Codex] Buffering user message until saved Codex thread resume completes'); + bufferedResumeMessages.push({ + text: message.content.text, + mode: enhancedMode + }); + return; + } messageQueue.push(message.content.text, enhancedMode); }); let thinking = false; @@ -571,8 +616,8 @@ export async function runCodex(opts: { } } - function buildResumePromptContext(resumeFile: string | null): string { - if (!resumeFile) return ''; + function readResumeTranscriptMessages(resumeFile: string | null): Array<{ role: 'User' | 'Assistant'; text: string }> { + if (!resumeFile) return []; try { const raw = fs.readFileSync(resumeFile, 'utf8'); const lines = raw.split('\n').filter((line) => line.trim().length > 0); @@ -622,37 +667,53 @@ export async function runCodex(opts: { messages.push({ role, text }); } - if (!messages.length) { - return ''; - } - - const selected: string[] = []; - let totalChars = 0; - for (let i = messages.length - 1; i >= 0; i -= 1) { - const formatted = `${messages[i].role}: ${messages[i].text}`; - if (selected.length >= 24 || totalChars + formatted.length > 12000) { - break; - } - selected.push(formatted); - totalChars += formatted.length + 2; - } - selected.reverse(); - - return [ - 'Restored context from the previous unavailable Codex thread.', - 'Treat the following transcript as prior conversation state and preserved memory.', - ...selected - ].join('\n\n'); + return messages; } catch (error) { logger.debug('[Codex] Failed to build resume prompt context', error); + return []; + } + } + + function buildRecentResumeUserTexts(resumeFile: string | null): Set { + const messages = readResumeTranscriptMessages(resumeFile); + const recentUsers = messages + .filter((message) => message.role === 'User') + .slice(-24) + .map((message) => normalizeResumeUserText(message.text)) + .filter(Boolean); + return new Set(recentUsers); + } + + function buildResumePromptContext(resumeFile: string | null): string { + const messages = readResumeTranscriptMessages(resumeFile); + if (!messages.length) { return ''; } + + const selected: string[] = []; + let totalChars = 0; + for (let i = messages.length - 1; i >= 0; i -= 1) { + const formatted = `${messages[i].role}: ${messages[i].text}`; + if (selected.length >= 24 || totalChars + formatted.length > 12000) { + break; + } + selected.push(formatted); + totalChars += formatted.length + 2; + } + selected.reverse(); + + return [ + 'Restored context from the previous unavailable Codex thread.', + 'Treat the following transcript as prior conversation state and preserved memory.', + ...selected + ].join('\n\n'); } if (currentCodexSessionId) { bootResumeFile = findCodexResumeFile(currentCodexSessionId); if (bootResumeFile) { logger.debug('[Codex] Found resume file from session snapshot:', bootResumeFile); + recentResumeUserTexts = buildRecentResumeUserTexts(bootResumeFile); } else { logger.debug(`[Codex] No resume file found for snapshot codex session ${currentCodexSessionId}`); } @@ -807,8 +868,36 @@ export async function runCodex(opts: { if (currentCodexSessionId || currentCodexConversationId) { client.seedSessionIdentifiers(currentCodexSessionId, currentCodexConversationId); wasCreated = true; + resumeBootstrapPending = true; logger.debug('[Codex] Seeded client with saved identifiers; first turn will attempt continueSession'); messageBuffer.addMessage('Attempting to resume saved Codex thread...', 'status'); + if (client instanceof CodexAppServerClient) { + const bootstrapExecutionPolicy = resolveCodexExecutionPolicy( + currentPermissionMode ?? 'default', + client.sandboxEnabled, + ); + const bootstrapConfig: Partial = { + sandbox: bootstrapExecutionPolicy.sandbox, + 'approval-policy': bootstrapExecutionPolicy.approvalPolicy, + }; + if (currentModel) { + bootstrapConfig.model = currentModel; + } + try { + await client.resumeSavedThread( + bootstrapConfig, + { signal: abortController.signal } + ); + } catch (error) { + logger.debug('[Codex] Initial saved-thread resume bootstrap failed', error); + } finally { + resumeBootstrapPending = false; + flushBufferedResumeMessages(); + } + } else { + resumeBootstrapPending = false; + flushBufferedResumeMessages(); + } } let currentModeHash: string | null = null; let pending: { message: string; mode: EnhancedMode; isolate: boolean; hash: string } | null = null;