diff --git a/docs/happy-codex-resume-fork.md b/docs/happy-codex-resume-fork.md new file mode 100644 index 0000000000..3c7a4e12ae --- /dev/null +++ b/docs/happy-codex-resume-fork.md @@ -0,0 +1,239 @@ +# Happy Codex Resume Fork + +## 概述 + +这个 fork 用来承载 `Happy Codex` 的会话恢复能力增强,目标是让已经关闭的 Happy Codex 会话可以在重新拉起后,尽量保留原来的对话记忆和上下文,而不是只恢复一个空壳会话。 + +当前 fork 信息: + +- Fork 仓库:`https://github.com/KevinCJM/happy` +- 工作分支:`codex/happy-codex-resume` +- 变更提交:`c81bbf1c80ef4b007b144c80c3438be5aa1c2a1e` + +建议用途: + +- 作为你自己的可维护分支继续迭代 +- 作为向上游发 PR 的基础 +- 作为给其他人复现 Happy Codex resume 方案的参考实现 + +## 这个 fork 解决了什么问题 + +原始问题主要有三类: + +1. Happy 会话恢复后在线,但 App 发消息没有响应 +2. 恢复后没有原来的对话记忆和上下文 +3. 需要手工拼环境变量和日志参数,恢复流程不稳定 + +这个 fork 解决后的行为是: + +- 恢复时复用旧 Happy 会话加密 key +- 恢复时读取并保存 Codex session/conversation 标识 +- 恢复路径优先走 Codex 原生 `app-server` 的 `thread/resume` +- 如果旧 thread 不可直接继续,则回退到本地 transcript 恢复 +- 提供正式命令:`happy codex resume` + +## 核心变更 + +### 1. 新增 CLI 命令 + +新增命令: + +```bash +happy codex resume --metadata-file +``` + +也支持: + +```bash +happy codex resume --path --pid +``` + +命令职责: + +- 从 metadata 和本地日志解析恢复参数 +- 自动定位 `sessionTag` +- 自动检查本地恢复快照 +- 自动停掉同一 Happy 会话的旧进程 +- 自动拉起恢复后的 Happy Codex 进程 + +实现入口: + +- `packages/happy-cli/src/index.ts` + +### 2. 修复恢复时的加密 key 复用 + +恢复模式下,如果 Happy 会话重新生成新的 `dataKey`,App 端已有消息就会解密失败,表现为在线但没回复。 + +当前实现改为: + +- 读取 `~/.happy-session-crypto/session-.json` +- 或 `~/.happy-session-crypto/tag-.json` +- 如果找到旧 `dataKey`,直接复用 + +这样恢复后的进程与原 Happy 会话仍然使用同一把会话 key。 + +实现位置: + +- `packages/happy-cli/src/api/api.ts` + +### 3. 增加 Happy 会话快照 + +当前分支会把以下内容持久化到本地快照: + +- `sessionId` +- `sessionTag` +- `encryptionVariant` +- `encryptionKeyBase64` +- `codexSessionId` +- `codexConversationId` + +作用: + +- 下次恢复时不用只靠旧日志 +- 能直接找到原会话对应的 Codex 标识 +- 允许在 Happy 服务端不可达时回退到本地快照恢复 + +实现位置: + +- `packages/happy-cli/src/codex/runCodex.ts` + +### 4. 恢复时优先走 Codex 原生 app-server + +普通新会话仍然走现有 MCP 路径。 +恢复场景则切到新的原生恢复客户端,直接调用: + +- `thread/resume` +- `turn/start` +- `turn/interrupt` + +这是当前恢复原上下文最关键的改动。 + +实现位置: + +- `packages/happy-cli/src/codex/codexAppServerClient.ts` +- `packages/happy-cli/src/codex/runCodex.ts` + +### 5. 旧 thread 无法继续时的回退策略 + +如果旧 Codex thread 不能直接续接,这个 fork 不会直接失败,而是: + +1. 从本地 `~/.codex/sessions` 查找 rollout 文件 +2. 提取最近的用户/助手对话 +3. 把 transcript 作为恢复上下文注入新 session +4. 同时继续使用 `experimental_resume` + +这一步不能保证“云端原 thread 原样复活”,但能最大化保留本地已有记忆和工作上下文。 + +实现位置: + +- `packages/happy-cli/src/codex/runCodex.ts` + +## 变更文件 + +本 fork 当前主要改动文件: + +- `packages/happy-cli/src/index.ts` +- `packages/happy-cli/src/api/api.ts` +- `packages/happy-cli/src/codex/runCodex.ts` +- `packages/happy-cli/src/codex/codexMcpClient.ts` +- `packages/happy-cli/src/codex/codexAppServerClient.ts` + +## 如何获取这个 fork + +如果是在另一台机器上复现: + +```bash +git clone https://github.com/KevinCJM/happy.git +cd happy +git checkout codex/happy-codex-resume +``` + +如果已经有上游仓库本地副本: + +```bash +git remote add fork https://github.com/KevinCJM/happy.git +git fetch fork +git switch -c codex/happy-codex-resume --track fork/codex/happy-codex-resume +``` + +## 如何构建 + +在仓库根目录执行: + +```bash +npx --yes yarn@1.22.22 install +npx --yes yarn@1.22.22 workspace happy-coder build +``` + +## 如何验证 + +### 验证命令是否接入 + +```bash +cd packages/happy-cli +node ./dist/index.mjs codex resume --help +``` + +### 验证恢复逻辑 + +前提: + +- 目标 Happy session 的本地快照仍在 +- 对应的本地 Codex rollout 文件仍在 +- 当前机器仍是原机器 + +示例: + +```bash +happy codex resume --metadata-file ./metadata.json +``` + +重点观察: + +- 进程是否成功启动 +- App 端是否能正常发消息 +- 恢复后的回答是否带有原来的记忆上下文 + +## 向上游发 PR + +当前 fork 分支已经可以直接用于 PR。 + +PR 创建入口: + +- `https://github.com/KevinCJM/happy/pull/new/codex/happy-codex-resume` + +如果你要提交到上游 `slopus/happy`,建议 PR 标题可以用: + +```text +Add Happy Codex resume workflow +``` + +建议 PR 描述至少包含: + +- 新增 `happy codex resume` +- 恢复时复用会话加密 key +- 保存并恢复 Codex session identifiers +- restore 场景优先使用原生 `app-server thread/resume` +- fallback 到本地 transcript/context 恢复 + +## 当前限制 + +这个 fork 的目标是“恢复之前的对话记忆和上下文”,不是保证“服务端原 thread 永久可续接”。 + +需要明确两点: + +1. 旧 Codex thread 可能已经不可继续 +2. 即使旧 thread 不可继续,本 fork 仍会尽量利用本地 transcript 恢复上下文 + +也就是说,这个方案优先保证“恢复体验”,不是强依赖旧服务端 thread 一定存在。 + +## 结论 + +这个 fork 已经把 Happy Codex 的恢复流程从“手工调试级别”推进到了“可命令化、可重复、可分享”的状态。 + +对外可以这样描述这个分支: + +- 它增加了 `happy codex resume` +- 它能恢复 Happy 会话层加密上下文 +- 它能尽量恢复原来的 Codex 对话记忆和工作上下文 +- 它在旧 thread 失效时仍然有本地 transcript fallback diff --git a/packages/happy-cli/src/api/api.ts b/packages/happy-cli/src/api/api.ts index fc38118043..e85bbf12c8 100644 --- a/packages/happy-cli/src/api/api.ts +++ b/packages/happy-cli/src/api/api.ts @@ -9,6 +9,9 @@ import { configuration } from '@/configuration'; import chalk from 'chalk'; import { Credentials } from '@/persistence'; import { connectionState, isNetworkError } from '@/utils/serverConnectionErrors'; +import { existsSync, readFileSync } from 'node:fs'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; export class ApiClient { @@ -33,15 +36,66 @@ export class ApiClient { state: AgentState | null }): Promise { + const loadRestoreSessionSnapshot = (): { encryptionKey: Uint8Array; encryptionVariant: 'dataKey' } | null => { + const restoreSessionId = process.env.HAPPY_RESTORE_SESSION_ID; + const restoreSessionTag = process.env.HAPPY_RESTORE_SESSION_TAG || process.env.HAPPY_SESSION_TAG_OVERRIDE; + if (!restoreSessionId && !restoreSessionTag) { + return null; + } + + const candidates: string[] = []; + if (restoreSessionId) { + candidates.push(join(homedir(), '.happy-session-crypto', `session-${restoreSessionId}.json`)); + } + if (restoreSessionTag) { + candidates.push(join(homedir(), '.happy-session-crypto', `tag-${restoreSessionTag}.json`)); + } + + for (const candidatePath of candidates) { + try { + if (!existsSync(candidatePath)) { + continue; + } + const parsed = JSON.parse(readFileSync(candidatePath, 'utf8')); + if (parsed?.encryptionVariant !== 'dataKey' || !parsed?.encryptionKeyBase64) { + continue; + } + const restoredKey = Uint8Array.from(Buffer.from(parsed.encryptionKeyBase64, 'base64')); + if (restoredKey.length !== 32) { + continue; + } + logger.debug('[SessionCrypto] Reusing saved session encryption key for restore', { + source: candidatePath, + sessionId: parsed.sessionId, + encryptionVariant: parsed.encryptionVariant + }); + return { + encryptionKey: restoredKey, + encryptionVariant: parsed.encryptionVariant + }; + } catch (error) { + logger.debug(`[SessionCrypto] Failed loading restore snapshot from ${candidatePath}`, error); + } + } + + return null; + }; + + const restoreSnapshot = loadRestoreSessionSnapshot(); + // Resolve encryption key let dataEncryptionKey: Uint8Array | null = null; let encryptionKey: Uint8Array; let encryptionVariant: 'legacy' | 'dataKey'; if (this.credential.encryption.type === 'dataKey') { - - // Generate new encryption key - encryptionKey = getRandomBytes(32); - encryptionVariant = 'dataKey'; + if (restoreSnapshot) { + encryptionKey = restoreSnapshot.encryptionKey; + encryptionVariant = restoreSnapshot.encryptionVariant; + } else { + // Generate new encryption key + encryptionKey = getRandomBytes(32); + encryptionVariant = 'dataKey'; + } // Derive and encrypt data encryption key // const contentDataKey = await deriveKey(this.secret, 'Happy EnCoder', ['content']); diff --git a/packages/happy-cli/src/api/apiSession.test.ts b/packages/happy-cli/src/api/apiSession.test.ts index 9122558495..d74993023a 100644 --- a/packages/happy-cli/src/api/apiSession.test.ts +++ b/packages/happy-cli/src/api/apiSession.test.ts @@ -71,7 +71,14 @@ vi.mock('@/utils/time', () => ({ type SocketHandler = (...args: any[]) => void; type SocketHandlers = Record; -function makeSession() { +function makeSession(overrides: Partial> = {}) { + return { + ...makeSessionBase(), + ...overrides + }; +} + +function makeSessionBase() { return { id: 'test-session-id', seq: 0, @@ -859,6 +866,25 @@ describe('ApiSessionClient v3 messages API migration', () => { expect(mockAxiosGet.mock.calls[0][1].params.after_seq).toBe(0); }); + it('starts receive cursor from the existing session seq on reconnect', async () => { + const resumedSession = makeSession({ seq: 23 }); + new ApiSessionClient('fake-token', resumedSession); + + mockAxiosGet.mockResolvedValueOnce({ + data: { + messages: [], + hasMore: false + } + }); + + emitSocketEvent('connect'); + + await waitForCheck(() => { + expect(mockAxiosGet).toHaveBeenCalledTimes(1); + }); + expect(mockAxiosGet.mock.calls[0][1].params.after_seq).toBe(23); + }); + it('stops send and receive sync loops on close', async () => { const client = new ApiSessionClient('fake-token', session); await client.close(); diff --git a/packages/happy-cli/src/api/apiSession.ts b/packages/happy-cli/src/api/apiSession.ts index f8dbf61cdb..a48c565f36 100644 --- a/packages/happy-cli/src/api/apiSession.ts +++ b/packages/happy-cli/src/api/apiSession.ts @@ -97,7 +97,7 @@ export class ApiSessionClient extends EventEmitter { startedSubagents: new Set(), activeSubagents: new Set(), }; - private lastSeq = 0; + private lastSeq: number; private pendingOutbox: Array<{ content: string; localId: string }> = []; private readonly sendSync: InvalidateSync; private readonly receiveSync: InvalidateSync; @@ -112,6 +112,7 @@ export class ApiSessionClient extends EventEmitter { this.agentStateVersion = session.agentStateVersion; this.encryptionKey = session.encryptionKey; this.encryptionVariant = session.encryptionVariant; + this.lastSeq = Math.max(0, session.seq); this.sendSync = new InvalidateSync(() => this.flushOutbox()); this.receiveSync = new InvalidateSync(() => this.fetchMessages()); diff --git a/packages/happy-cli/src/codex/__tests__/resumeReplayFilter.test.ts b/packages/happy-cli/src/codex/__tests__/resumeReplayFilter.test.ts new file mode 100644 index 0000000000..04f61ebe11 --- /dev/null +++ b/packages/happy-cli/src/codex/__tests__/resumeReplayFilter.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from 'vitest'; +import { filterBufferedResumeMessages, normalizeResumeUserText } from '../runCodex'; + +describe('resume replay filtering', () => { + it('normalizes resume user text', () => { + expect(normalizeResumeUserText(' hello\r\nworld ')).toBe('hello\nworld'); + }); + + it('drops buffered messages already present in the saved transcript', () => { + const recentResumeUserTexts = new Set([ + normalizeResumeUserText('记忆测试: 大象在冰箱里\n只回复 ACK1'), + normalizeResumeUserText('测试: 大象在哪里?\n只回复 ACK2:冰箱里'), + ]); + + const buffered = [ + { text: ' 记忆测试: 大象在冰箱里\r\n只回复 ACK1 ', mode: 'same' }, + { text: '测试: 大象在哪里?\n只回复 ACK2:冰箱里', mode: 'same' }, + { text: '测试: 小兔子在哪里?\n只回复 NEW', mode: 'same' }, + ]; + + expect(filterBufferedResumeMessages(buffered, recentResumeUserTexts)).toEqual([ + { text: '测试: 小兔子在哪里?\n只回复 NEW', mode: 'same' }, + ]); + }); +}); diff --git a/packages/happy-cli/src/codex/codexAppServerClient.ts b/packages/happy-cli/src/codex/codexAppServerClient.ts new file mode 100644 index 0000000000..575b6bda0c --- /dev/null +++ b/packages/happy-cli/src/codex/codexAppServerClient.ts @@ -0,0 +1,720 @@ +import { spawn, execSync, type ChildProcessWithoutNullStreams } from 'node:child_process'; +import { randomUUID } from 'node:crypto'; +import { logger } from '@/ui/logger'; +import type { CodexPermissionHandler } from './utils/permissionHandler'; +import type { CodexSessionConfig, CodexToolResponse } from './types'; + +const DEFAULT_TIMEOUT = 14 * 24 * 60 * 60 * 1000; + +function createAbortError(): Error { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + return error; +} + +function mapPermissionResultToApprovalDecision(result: any, kind: 'command' | 'file'): any { + if (result?.decision === 'approved_for_session') { + return 'acceptForSession'; + } + if (result?.decision === 'approved_execpolicy_amendment' && kind === 'command' && result.execPolicyAmendment?.command?.length) { + return { + acceptWithExecpolicyAmendment: { + execpolicy_amendment: result.execPolicyAmendment.command + } + }; + } + if (result?.decision === 'abort') { + return 'cancel'; + } + if (result?.decision === 'approved' || result?.decision === 'approved_execpolicy_amendment') { + return 'accept'; + } + return 'decline'; +} + +function mapSandboxModeToPolicy(mode: CodexSessionConfig['sandbox']) { + switch (mode) { + case 'read-only': + return { + type: 'readOnly', + access: { type: 'fullAccess' }, + networkAccess: false + }; + case 'danger-full-access': + return { + type: 'dangerFullAccess' + }; + case 'workspace-write': + default: + return { + type: 'workspaceWrite', + writableRoots: [], + readOnlyAccess: { type: 'fullAccess' }, + networkAccess: false, + excludeTmpdirEnvVar: false, + excludeSlashTmp: false + }; + } +} + +function stringifyError(error: unknown): unknown { + if (error instanceof Error) { + return { + name: error.name, + message: error.message, + stack: error.stack + }; + } + return error; +} + +function ensureCodexCliAvailable(): void { + try { + execSync('codex --version', { stdio: 'ignore' }); + } catch { + throw new Error( + 'Codex CLI not found or not executable.\n' + + '\n' + + 'To install codex:\n' + + ' npm install -g @openai/codex\n' + + '\n' + + 'Alternatively, use Claude:\n' + + ' happy claude' + ); + } +} + +type PendingRequest = { + resolve: (value: any) => void; + reject: (error: Error) => void; + timer: NodeJS.Timeout; +}; + +type PendingTurn = { + resolve: (value: CodexToolResponse & { structuredContent?: any }) => void; + reject: (error: Error) => void; + timer: NodeJS.Timeout; +}; + +export class CodexAppServerClient { + private child: ChildProcessWithoutNullStreams | null = null; + private connected = false; + private sessionId: string | null = null; + private conversationId: string | null = null; + private handler: ((event: any) => void) | null = null; + private permissionHandler: CodexPermissionHandler | null = null; + private requestCounter = 0; + private stdoutBuffer = ''; + private pendingRequests = new Map(); + private pendingTurns = new Map(); + private lastAgentMessages = new Map(); + private resumed = false; + private threadConfig: Partial = {}; + + public sandboxEnabled = false; + + setHandler(handler: ((event: any) => void) | null): void { + this.handler = handler; + } + + setPermissionHandler(handler: CodexPermissionHandler): void { + this.permissionHandler = handler; + } + + seedSessionIdentifiers(sessionId: string | null, conversationId: string | null): void { + const normalizedSessionId = typeof sessionId === 'string' && sessionId.length > 0 ? sessionId : null; + const normalizedConversationId = + typeof conversationId === 'string' && conversationId.length > 0 ? conversationId : normalizedSessionId; + this.sessionId = normalizedSessionId; + this.conversationId = normalizedConversationId; + this.resumed = false; + logger.debug('[CodexAppServer] Session identifiers seeded', { + sessionId: this.sessionId, + conversationId: this.conversationId + }); + } + + getSessionId(): string | null { + return this.sessionId; + } + + hasActiveSession(): boolean { + return this.sessionId !== null; + } + + clearSession(): void { + const previousSessionId = this.sessionId; + this.sessionId = null; + this.conversationId = null; + this.resumed = false; + this.threadConfig = {}; + logger.debug('[CodexAppServer] Session cleared, previous sessionId:', previousSessionId); + } + + storeSessionForResume(): string | null { + logger.debug('[CodexAppServer] Storing session for potential resume:', this.sessionId); + return this.sessionId; + } + + async resumeSavedThread( + config?: Partial, + options?: { timeout?: number; signal?: AbortSignal } + ): Promise { + if (!this.connected) { + await this.connect(); + } + await this.ensureThreadResumed(config, options); + } + + async forceCloseSession(): Promise { + logger.debug('[CodexAppServer] Force closing session'); + try { + await this.disconnect(); + } finally { + this.clearSession(); + } + logger.debug('[CodexAppServer] Session force-closed'); + } + + async disconnect(): Promise { + if (!this.child) { + this.connected = false; + return; + } + + const child = this.child; + this.child = null; + this.connected = false; + + try { + child.kill('SIGTERM'); + } catch { + } + + await new Promise((resolve) => { + const timer = setTimeout(() => resolve(), 500); + child.once('exit', () => { + clearTimeout(timer); + resolve(); + }); + }); + } + + async connect(): Promise { + if (this.connected) return; + + ensureCodexCliAvailable(); + logger.debug('[CodexAppServer] Connecting using command: codex app-server --listen stdio://'); + + const transportEnv = Object.keys(process.env).reduce((acc, key) => { + const value = process.env[key]; + if (typeof value === 'string') { + acc[key] = value; + } + return acc; + }, {} as Record); + + this.child = spawn('codex', ['app-server', '--listen', 'stdio://'], { + env: transportEnv, + stdio: ['pipe', 'pipe', 'pipe'] + }); + + this.child.stdout.on('data', (chunk) => { + this.handleStdoutChunk(chunk.toString('utf8')); + }); + + this.child.stderr.on('data', (chunk) => { + const text = chunk.toString('utf8').trim(); + if (text) { + logger.debug('[CodexAppServer][stderr]', text); + } + }); + + this.child.on('exit', (code, signal) => { + logger.debug('[CodexAppServer] Process exited', { code, signal }); + const error = new Error(`Codex app-server exited (code=${code ?? 'null'}, signal=${signal ?? 'null'})`); + for (const { reject, timer } of this.pendingRequests.values()) { + clearTimeout(timer); + reject(error); + } + this.pendingRequests.clear(); + for (const { reject, timer } of this.pendingTurns.values()) { + clearTimeout(timer); + reject(error); + } + this.pendingTurns.clear(); + this.connected = false; + this.child = null; + }); + + this.child.on('error', (error) => { + logger.debug('[CodexAppServer] Process error', stringifyError(error)); + }); + + await this.request('initialize', { + clientInfo: { + name: 'happy-codex-app-server-client', + version: '1.0.0' + }, + capabilities: { + experimentalApi: true + } + }); + + this.connected = true; + logger.debug('[CodexAppServer] Connected'); + } + + private handleStdoutChunk(chunk: string): void { + this.stdoutBuffer += chunk; + + while (true) { + const newlineIndex = this.stdoutBuffer.indexOf('\n'); + if (newlineIndex === -1) { + break; + } + const line = this.stdoutBuffer.slice(0, newlineIndex).trim(); + this.stdoutBuffer = this.stdoutBuffer.slice(newlineIndex + 1); + if (!line) { + continue; + } + + try { + const message = JSON.parse(line); + this.dispatchMessage(message); + } catch (error) { + logger.debug('[CodexAppServer] Failed to parse stdout line', { line, error: stringifyError(error) }); + } + } + } + + private dispatchMessage(message: any): void { + if (message && typeof message === 'object' && 'id' in message && ('result' in message || 'error' in message)) { + const pending = this.pendingRequests.get(message.id); + if (!pending) { + return; + } + clearTimeout(pending.timer); + this.pendingRequests.delete(message.id); + if ('error' in message) { + pending.reject(new Error(message.error?.message || 'App server request failed')); + } else { + pending.resolve(message.result); + } + return; + } + + if (message && typeof message === 'object' && 'id' in message && 'method' in message) { + void this.handleServerRequest(message); + return; + } + + if (message && typeof message === 'object' && 'method' in message) { + this.handleNotification(message); + } + } + + private send(message: unknown): void { + if (!this.child?.stdin) { + throw new Error('Codex app-server stdin is not available'); + } + this.child.stdin.write(`${JSON.stringify(message)}\n`); + } + + private request(method: string, params: unknown, options?: { timeout?: number; signal?: AbortSignal }): Promise { + const id = `${method}-${++this.requestCounter}-${randomUUID()}`; + const timeoutMs = options?.timeout ?? DEFAULT_TIMEOUT; + + if (options?.signal?.aborted) { + return Promise.reject(createAbortError()); + } + + return new Promise((resolve, reject) => { + let settled = false; + + const cleanupAbort = () => { + options?.signal?.removeEventListener('abort', onAbort); + }; + + const settleReject = (error: Error) => { + if (settled) { + return; + } + settled = true; + cleanupAbort(); + clearTimeout(timer); + this.pendingRequests.delete(id); + reject(error); + }; + + const settleResolve = (value: unknown) => { + if (settled) { + return; + } + settled = true; + cleanupAbort(); + clearTimeout(timer); + this.pendingRequests.delete(id); + resolve(value); + }; + + const onAbort = () => { + settleReject(createAbortError()); + }; + + const timer = setTimeout(() => { + settleReject(new Error(`Codex app-server request timed out: ${method}`)); + }, timeoutMs); + + if (options?.signal) { + options.signal.addEventListener('abort', onAbort, { once: true }); + } + + this.pendingRequests.set(id, { resolve: settleResolve, reject: settleReject, timer }); + this.send({ + id, + method, + params + }); + }); + } + + private async handleServerRequest(message: any): Promise { + try { + let result: unknown; + switch (message.method) { + case 'item/commandExecution/requestApproval': + result = await this.handleCommandApproval(message.params); + break; + case 'item/fileChange/requestApproval': + result = await this.handleFileChangeApproval(message.params); + break; + case 'item/tool/call': + result = { + success: false, + contentItems: [ + { + type: 'inputText', + text: `Dynamic tool calls are not supported by Happy's app-server restore path: ${message.params?.tool || 'unknown'}` + } + ] + }; + break; + case 'item/tool/requestUserInput': + result = { answers: {} }; + break; + case 'mcpServer/elicitation/request': + result = { action: 'decline' }; + break; + default: + throw new Error(`Unsupported app-server request method: ${message.method}`); + } + + this.send({ + id: message.id, + result + }); + } catch (error) { + logger.debug('[CodexAppServer] Error handling server request', { + method: message?.method, + error: stringifyError(error) + }); + this.send({ + id: message.id, + error: { + code: -32000, + message: error instanceof Error ? error.message : 'Unknown app-server request error' + } + }); + } + } + + private async handleCommandApproval(params: any): Promise<{ decision: any }> { + if (!this.permissionHandler) { + return { decision: 'decline' }; + } + + const permissionId = String(params.approvalId ?? params.itemId ?? randomUUID()); + const command = Array.isArray(params.command) + ? params.command.join(' ') + : typeof params.command === 'string' + ? params.command + : ''; + const result = await this.permissionHandler.handleToolCall(permissionId, 'CodexBash', { + command, + cwd: params.cwd, + reason: params.reason, + commandActions: params.commandActions, + proposedExecpolicyAmendment: params.proposedExecpolicyAmendment + }); + + return { + decision: mapPermissionResultToApprovalDecision(result, 'command') + }; + } + + private async handleFileChangeApproval(params: any): Promise<{ decision: any }> { + if (!this.permissionHandler) { + return { decision: 'decline' }; + } + const permissionId = String(params.itemId ?? randomUUID()); + const result = await this.permissionHandler.handleToolCall(permissionId, 'CodexPatch', params); + return { + decision: mapPermissionResultToApprovalDecision(result, 'file') + }; + } + + private handleNotification(notification: any): void { + if (notification.method === 'turn/completed') { + this.resolveTurn(notification.params?.turn?.id || notification.params?.turnId); + return; + } + + if (!notification.method?.startsWith('codex/event/')) { + return; + } + + const params = notification.params || {}; + const msg = params.msg; + if (!msg || typeof msg !== 'object') { + return; + } + + const threadId = typeof msg.thread_id === 'string' && msg.thread_id.length > 0 ? msg.thread_id : null; + const conversationId = + typeof params.conversationId === 'string' && params.conversationId.length > 0 + ? params.conversationId + : threadId; + + if (threadId) { + this.sessionId = threadId; + } + if (conversationId) { + this.conversationId = conversationId; + } + + if (msg.type === 'agent_message') { + const turnId = typeof params.id === 'string' && params.id.length > 0 ? params.id : msg.turn_id; + if (turnId && typeof msg.message === 'string') { + this.lastAgentMessages.set(turnId, msg.message); + } + } else if (msg.type === 'task_complete') { + const turnId = typeof msg.turn_id === 'string' && msg.turn_id.length > 0 ? msg.turn_id : params.id; + this.resolveTurn(turnId, msg.last_agent_message); + } + + this.handler?.(msg); + } + + private resolveTurn(turnId: string | null | undefined, finalMessage?: string): void { + if (!turnId) { + return; + } + const pending = this.pendingTurns.get(turnId); + if (!pending) { + return; + } + + clearTimeout(pending.timer); + this.pendingTurns.delete(turnId); + const message = typeof finalMessage === 'string' && finalMessage.length > 0 + ? finalMessage + : this.lastAgentMessages.get(turnId) || ''; + this.lastAgentMessages.delete(turnId); + + pending.resolve({ + content: message ? [{ type: 'text', text: message }] : [], + structuredContent: { + threadId: this.sessionId, + content: message + } + }); + } + + private buildThreadParams(config?: Partial): Record { + const params: Record = {}; + if (config?.cwd) { + params.cwd = config.cwd; + } + if (config?.config) { + params.config = config.config; + } + if (config?.['approval-policy']) { + params.approvalPolicy = config['approval-policy']; + } + if (config?.sandbox) { + params.sandbox = config.sandbox; + } + if (config?.model) { + params.model = config.model; + } + return params; + } + + private buildTurnParams(prompt: string, config?: Partial): Record { + const params: Record = { + threadId: this.sessionId, + input: [ + { + type: 'text', + text: prompt + } + ] + }; + if (config?.cwd) { + params.cwd = config.cwd; + } + if (config?.['approval-policy']) { + params.approvalPolicy = config['approval-policy']; + } + if (config?.sandbox) { + params.sandboxPolicy = mapSandboxModeToPolicy(config.sandbox); + } + if (config?.model) { + params.model = config.model; + } + return params; + } + + private async ensureThreadResumed(config?: Partial, options?: { timeout?: number; signal?: AbortSignal }): Promise { + if (this.resumed) { + return; + } + if (!this.sessionId) { + throw new Error('No saved Codex thread id to resume'); + } + + const response = await this.request('thread/resume', { + threadId: this.sessionId, + ...this.buildThreadParams(config) + }, options); + const threadId = response?.thread?.id; + if (typeof threadId === 'string' && threadId.length > 0) { + this.sessionId = threadId; + this.conversationId = threadId; + } + this.resumed = true; + this.threadConfig = { ...this.threadConfig, ...config }; + logger.debug('[CodexAppServer] Resumed thread', { threadId: this.sessionId }); + } + + private async waitForTurn(turnId: string | null | undefined, options?: { timeout?: number; signal?: AbortSignal }): Promise { + if (!turnId) { + return { + content: [], + structuredContent: { + threadId: this.sessionId, + content: '' + } + }; + } + + if (options?.signal?.aborted) { + void this.interruptTurn(turnId); + throw createAbortError(); + } + + return new Promise((resolve, reject) => { + let settled = false; + + const cleanupAbort = () => { + options?.signal?.removeEventListener('abort', onAbort); + }; + + const settleReject = (error: Error) => { + if (settled) { + return; + } + settled = true; + cleanupAbort(); + clearTimeout(timer); + this.pendingTurns.delete(turnId); + reject(error); + }; + + const settleResolve = (value: CodexToolResponse & { structuredContent?: any }) => { + if (settled) { + return; + } + settled = true; + cleanupAbort(); + clearTimeout(timer); + this.pendingTurns.delete(turnId); + resolve(value); + }; + + const onAbort = () => { + void this.interruptTurn(turnId); + settleReject(createAbortError()); + }; + + const timer = setTimeout(() => { + settleReject(new Error(`Codex app-server turn timed out: ${turnId}`)); + }, options?.timeout ?? DEFAULT_TIMEOUT); + + if (options?.signal) { + options.signal.addEventListener('abort', onAbort, { once: true }); + } + + this.pendingTurns.set(turnId, { resolve: settleResolve, reject: settleReject, timer }); + }); + } + + private async interruptTurn(turnId: string): Promise { + if (!this.sessionId || !turnId) { + return; + } + try { + await this.request('turn/interrupt', { + threadId: this.sessionId, + turnId + }, { timeout: 5000 }); + logger.debug('[CodexAppServer] Interrupted turn', { threadId: this.sessionId, turnId }); + } catch (error) { + logger.debug('[CodexAppServer] Failed to interrupt turn', { + threadId: this.sessionId, + turnId, + error: stringifyError(error) + }); + } + } + + private async runTurn(prompt: string, config?: Partial, options?: { timeout?: number; signal?: AbortSignal }): Promise { + const response = await this.request('turn/start', this.buildTurnParams(prompt, config), options); + const turnId = response?.turn?.id; + return this.waitForTurn(turnId, options); + } + + async startSession(config: CodexSessionConfig, options?: { signal?: AbortSignal }): Promise { + if (!this.connected) { + await this.connect(); + } + + const response = await this.request('thread/start', this.buildThreadParams(config), options); + const threadId = response?.thread?.id; + if (!threadId) { + throw new Error('Codex app-server did not return a thread id'); + } + + this.sessionId = threadId; + this.conversationId = threadId; + this.resumed = true; + this.threadConfig = { ...config }; + logger.debug('[CodexAppServer] Started thread', { threadId }); + return this.runTurn(config.prompt, config, options); + } + + async continueSession( + prompt: string, + options?: { signal?: AbortSignal; happyConfig?: Partial } + ): Promise { + if (!this.connected) { + await this.connect(); + } + + const config = options?.happyConfig || this.threadConfig; + await this.ensureThreadResumed(config, options); + this.threadConfig = { ...this.threadConfig, ...config }; + return this.runTurn(prompt, config, options); + } +} diff --git a/packages/happy-cli/src/codex/codexMcpClient.ts b/packages/happy-cli/src/codex/codexMcpClient.ts index 25f2e6854d..c91332d1ca 100644 --- a/packages/happy-cli/src/codex/codexMcpClient.ts +++ b/packages/happy-cli/src/codex/codexMcpClient.ts @@ -262,7 +262,7 @@ export class CodexMcpClient { return response as CodexToolResponse; } - async continueSession(prompt: string, options?: { signal?: AbortSignal }): Promise { + async continueSession(prompt: string, options?: { signal?: AbortSignal; happyConfig?: Partial }): Promise { if (!this.connected) await this.connect(); if (!this.sessionId) { @@ -292,6 +292,20 @@ export class CodexMcpClient { return response as CodexToolResponse; } + seedSessionIdentifiers(sessionId: string | null, conversationId: string | null): void { + const normalizedSessionId = typeof sessionId === 'string' && sessionId.length > 0 ? sessionId : null; + const normalizedConversationId = + typeof conversationId === 'string' && conversationId.length > 0 + ? conversationId + : normalizedSessionId; + this.sessionId = normalizedSessionId; + this.conversationId = normalizedConversationId; + logger.debug('[CodexMCP] Session identifiers seeded', { + sessionId: this.sessionId, + conversationId: this.conversationId, + }); + } + private updateIdentifiersFromEvent(event: any): void { if (!event || typeof event !== 'object') { diff --git a/packages/happy-cli/src/codex/runCodex.ts b/packages/happy-cli/src/codex/runCodex.ts index 0d51a7b974..1e8e7cd049 100644 --- a/packages/happy-cli/src/codex/runCodex.ts +++ b/packages/happy-cli/src/codex/runCodex.ts @@ -2,6 +2,7 @@ import { render } from "ink"; import React from "react"; import { ApiClient } from '@/api/api'; import { CodexMcpClient } from './codexMcpClient'; +import { CodexAppServerClient } from './codexAppServerClient'; import { CodexPermissionHandler } from './utils/permissionHandler'; import { ReasoningProcessor } from './utils/reasoningProcessor'; import { DiffProcessor } from './utils/diffProcessor'; @@ -31,6 +32,7 @@ import { stopCaffeinate } from "@/utils/caffeinate"; import { connectionState } from '@/utils/serverConnectionErrors'; import { setupOfflineReconnection } from '@/utils/setupOfflineReconnection'; import type { ApiSessionClient } from '@/api/apiSession'; +import type { Session as HappySession } from '@/api/types'; import { resolveCodexExecutionPolicy } from './executionPolicy'; import { mapCodexMcpMessageToSessionEnvelopes, mapCodexProcessorMessageToSessionEnvelopes } from './utils/sessionProtocolMapper'; @@ -62,6 +64,19 @@ export function emitReadyIfIdle({ pending, queueSize, shouldExit, sendReady, not return true; } +export function normalizeResumeUserText(text: string): string { + return text.replace(/\r\n/g, '\n').trim(); +} + +export function filterBufferedResumeMessages( + messages: T[], + recentResumeUserTexts: Set, +): T[] { + return messages.filter(({ text }) => ( + !recentResumeUserTexts.has(normalizeResumeUserText(text)) + )); +} + /** * Main entry point for the codex command with ink UI */ @@ -81,7 +96,7 @@ export async function runCodex(opts: { // Define session // - const sessionTag = randomUUID(); + const sessionTag = process.env.HAPPY_SESSION_TAG_OVERRIDE || randomUUID(); // Set backend for offline warnings (before any API calls) connectionState.setBackend('Codex'); @@ -118,7 +133,156 @@ export async function runCodex(opts: { startedBy: opts.startedBy, sandbox: sandboxConfig, }); - const response = await api.getOrCreateSession({ tag: sessionTag, metadata, state }); + let currentCodexSessionId: string | null = null; + let currentCodexConversationId: string | null = null; + let bootResumeFile: string | null = null; + + const loadCodexIdentifiersFromSnapshot = (): boolean => { + const restoreSessionId = process.env.HAPPY_RESTORE_SESSION_ID; + const restoreSessionTag = process.env.HAPPY_RESTORE_SESSION_TAG || sessionTag; + const candidates: string[] = []; + if (restoreSessionId) { + candidates.push(join(os.homedir(), '.happy-session-crypto', `session-${restoreSessionId}.json`)); + } + if (restoreSessionTag) { + candidates.push(join(os.homedir(), '.happy-session-crypto', `tag-${restoreSessionTag}.json`)); + } + + for (const candidatePath of candidates) { + try { + if (!fs.existsSync(candidatePath)) { + continue; + } + const parsed = JSON.parse(fs.readFileSync(candidatePath, 'utf8')); + const sid = typeof parsed?.codexSessionId === 'string' && parsed.codexSessionId.length > 0 + ? parsed.codexSessionId + : null; + const cid = typeof parsed?.codexConversationId === 'string' && parsed.codexConversationId.length > 0 + ? parsed.codexConversationId + : null; + if (!sid && !cid) { + continue; + } + currentCodexSessionId = sid || cid; + currentCodexConversationId = cid || sid; + logger.debug('[SessionCrypto] Loaded codex identifiers from snapshot', { + source: candidatePath, + codexSessionId: currentCodexSessionId, + codexConversationId: currentCodexConversationId + }); + return true; + } catch (error) { + logger.debug(`[SessionCrypto] Failed loading codex identifiers from ${candidatePath}`, error); + } + } + + return false; + }; + + const persistSessionSnapshot = (sessionToPersist: ApiSessionClient | HappySession): void => { + try { + const rawSession = sessionToPersist as any; + const persistedSessionId = rawSession?.id || rawSession?.sessionId; + const persistedEncryptionKey = rawSession?.encryptionKey; + const persistedEncryptionVariant = rawSession?.encryptionVariant; + + if (!persistedSessionId || !persistedEncryptionKey || !persistedEncryptionVariant) { + return; + } + const snapshotDir = join(os.homedir(), '.happy-session-crypto'); + if (!fs.existsSync(snapshotDir)) { + fs.mkdirSync(snapshotDir, { recursive: true, mode: 0o700 }); + } + + const payload: Record = { + sessionId: persistedSessionId, + sessionTag, + encryptionVariant: persistedEncryptionVariant, + encryptionKeyBase64: Buffer.from(persistedEncryptionKey).toString('base64'), + savedAt: Date.now() + }; + + if (currentCodexSessionId) { + payload.codexSessionId = currentCodexSessionId; + } + if (currentCodexConversationId) { + payload.codexConversationId = currentCodexConversationId; + } + + const sessionPath = join(snapshotDir, `session-${persistedSessionId}.json`); + const tagPath = join(snapshotDir, `tag-${sessionTag}.json`); + fs.writeFileSync(sessionPath, JSON.stringify(payload, null, 2), { mode: 0o600 }); + fs.writeFileSync(tagPath, JSON.stringify(payload, null, 2), { mode: 0o600 }); + logger.debug(`[SessionCrypto] Persisted session snapshot: ${persistedSessionId} (tag: ${sessionTag})`); + } catch (error) { + logger.debug('[SessionCrypto] Failed to persist session snapshot', error); + } + }; + + const loadFallbackSessionFromSnapshot = (): HappySession | null => { + const restoreSessionId = process.env.HAPPY_RESTORE_SESSION_ID; + const restoreSessionTag = process.env.HAPPY_RESTORE_SESSION_TAG || sessionTag; + const candidates: string[] = []; + if (restoreSessionId) { + candidates.push(join(os.homedir(), '.happy-session-crypto', `session-${restoreSessionId}.json`)); + } + if (restoreSessionTag) { + candidates.push(join(os.homedir(), '.happy-session-crypto', `tag-${restoreSessionTag}.json`)); + } + + for (const candidatePath of candidates) { + try { + if (!fs.existsSync(candidatePath)) { + continue; + } + const parsed = JSON.parse(fs.readFileSync(candidatePath, 'utf8')); + if (!parsed?.sessionId || !parsed?.encryptionKeyBase64 || !parsed?.encryptionVariant) { + continue; + } + + if (typeof parsed.codexSessionId === 'string' && parsed.codexSessionId.length > 0) { + currentCodexSessionId = parsed.codexSessionId; + } + if (typeof parsed.codexConversationId === 'string' && parsed.codexConversationId.length > 0) { + currentCodexConversationId = parsed.codexConversationId; + } + if (!currentCodexSessionId && currentCodexConversationId) { + currentCodexSessionId = currentCodexConversationId; + } + if (!currentCodexConversationId && currentCodexSessionId) { + currentCodexConversationId = currentCodexSessionId; + } + + const encryptionKey = Uint8Array.from(Buffer.from(parsed.encryptionKeyBase64, 'base64')); + logger.debug(`[SessionCrypto] Loaded fallback snapshot from ${candidatePath}`); + return { + id: parsed.sessionId, + seq: 0, + metadata, + metadataVersion: 0, + agentState: state, + agentStateVersion: 0, + encryptionKey, + encryptionVariant: parsed.encryptionVariant + }; + } catch (error) { + logger.debug(`[SessionCrypto] Failed loading fallback snapshot from ${candidatePath}`, error); + } + } + + return null; + }; + + loadCodexIdentifiersFromSnapshot(); + + let response: HappySession | null = await api.getOrCreateSession({ tag: sessionTag, metadata, state }); + if (!response) { + const fallbackResponse = loadFallbackSessionFromSnapshot(); + if (fallbackResponse) { + response = fallbackResponse; + logger.debug(`[SessionCrypto] Using fallback snapshot session: ${fallbackResponse.id}`); + } + } // Handle server unreachable case - create offline stub with hot reconnection let session: ApiSessionClient; @@ -133,6 +297,16 @@ export async function runCodex(opts: { response, onSessionSwap: (newSession) => { session = newSession; + persistSessionSnapshot(newSession); + notifyDaemonSessionStarted(newSession.sessionId, metadata).then((result) => { + if (result?.error) { + logger.debug(`[START] Failed to report swapped session to daemon:`, result.error); + } else { + logger.debug(`[START] Reported swapped session ${newSession.sessionId} to daemon`); + } + }).catch((error) => { + logger.debug('[START] Failed to report swapped session to daemon (exception):', error); + }); // Update permission handler with new session to avoid stale reference if (permissionHandler) { permissionHandler.updateSession(newSession); @@ -140,6 +314,7 @@ export async function runCodex(opts: { } }); session = initialSession; + persistSessionSnapshot(initialSession); // Always report to daemon if it exists (skip if offline) if (response) { @@ -165,6 +340,30 @@ export async function runCodex(opts: { // Use shared PermissionMode type from api/types for cross-agent compatibility let currentPermissionMode: import('@/api/types').PermissionMode | undefined = undefined; let currentModel: string | undefined = undefined; + let resumeBootstrapPending = false; + let recentResumeUserTexts = new Set(); + const bufferedResumeMessages: Array<{ text: string; mode: EnhancedMode }> = []; + + function flushBufferedResumeMessages() { + if (!bufferedResumeMessages.length) { + return; + } + + const pendingMessages = bufferedResumeMessages.splice(0); + const filteredMessages = filterBufferedResumeMessages(pendingMessages, recentResumeUserTexts); + const droppedCount = pendingMessages.length - filteredMessages.length; + + if (droppedCount > 0) { + logger.debug('[Codex] Dropped duplicate pre-resume user messages', { + droppedCount, + keptCount: filteredMessages.length + }); + } + + for (const { text, mode } of filteredMessages) { + messageQueue.push(text, mode); + } + } session.onUserMessage((message) => { // Resolve permission mode (accept all modes, will be mapped in switch statement) @@ -191,6 +390,14 @@ export async function runCodex(opts: { permissionMode: messagePermissionMode || 'default', model: messageModel, }; + if (resumeBootstrapPending) { + logger.debug('[Codex] Buffering user message until saved Codex thread resume completes'); + bufferedResumeMessages.push({ + text: message.content.text, + mode: enhancedMode + }); + return; + } messageQueue.push(message.content.text, enhancedMode); }); let thinking = false; @@ -361,7 +568,11 @@ export async function runCodex(opts: { // Start Context // - const client = new CodexMcpClient(sandboxConfig); + const useRestoreClient = Boolean(currentCodexSessionId || currentCodexConversationId); + logger.debug(useRestoreClient ? '[Codex] Using app-server restore client' : '[Codex] Using MCP client'); + const client: CodexMcpClient | CodexAppServerClient = useRestoreClient + ? new CodexAppServerClient() + : new CodexMcpClient(sandboxConfig); // Helper: find Codex session transcript for a given sessionId function findCodexResumeFile(sessionId: string | null): string | null { @@ -404,6 +615,109 @@ export async function runCodex(opts: { return null; } } + + function readResumeTranscriptMessages(resumeFile: string | null): Array<{ role: 'User' | 'Assistant'; text: string }> { + if (!resumeFile) return []; + try { + const raw = fs.readFileSync(resumeFile, 'utf8'); + const lines = raw.split('\n').filter((line) => line.trim().length > 0); + const messages: Array<{ role: 'User' | 'Assistant'; text: string }> = []; + for (const line of lines) { + let parsed: any; + try { + parsed = JSON.parse(line); + } catch { + continue; + } + + if (parsed?.type !== 'response_item' || parsed?.payload?.type !== 'message') { + continue; + } + + const role = parsed.payload.role === 'user' + ? 'User' + : parsed.payload.role === 'assistant' + ? 'Assistant' + : null; + if (!role || !Array.isArray(parsed.payload.content)) { + continue; + } + + const text = parsed.payload.content + .map((item: any) => { + if (role === 'User' && item?.type === 'input_text' && typeof item.text === 'string') { + return item.text; + } + if (role === 'Assistant' && item?.type === 'output_text' && typeof item.text === 'string') { + return item.text; + } + return ''; + }) + .filter(Boolean) + .join('\n') + .trim(); + if (!text) { + continue; + } + + const last = messages[messages.length - 1]; + if (last && last.role === role && last.text === text) { + continue; + } + messages.push({ role, text }); + } + + return messages; + } catch (error) { + logger.debug('[Codex] Failed to build resume prompt context', error); + return []; + } + } + + function buildRecentResumeUserTexts(resumeFile: string | null): Set { + const messages = readResumeTranscriptMessages(resumeFile); + const recentUsers = messages + .filter((message) => message.role === 'User') + .slice(-24) + .map((message) => normalizeResumeUserText(message.text)) + .filter(Boolean); + return new Set(recentUsers); + } + + function buildResumePromptContext(resumeFile: string | null): string { + const messages = readResumeTranscriptMessages(resumeFile); + if (!messages.length) { + return ''; + } + + const selected: string[] = []; + let totalChars = 0; + for (let i = messages.length - 1; i >= 0; i -= 1) { + const formatted = `${messages[i].role}: ${messages[i].text}`; + if (selected.length >= 24 || totalChars + formatted.length > 12000) { + break; + } + selected.push(formatted); + totalChars += formatted.length + 2; + } + selected.reverse(); + + return [ + 'Restored context from the previous unavailable Codex thread.', + 'Treat the following transcript as prior conversation state and preserved memory.', + ...selected + ].join('\n\n'); + } + + if (currentCodexSessionId) { + bootResumeFile = findCodexResumeFile(currentCodexSessionId); + if (bootResumeFile) { + logger.debug('[Codex] Found resume file from session snapshot:', bootResumeFile); + recentResumeUserTexts = buildRecentResumeUserTexts(bootResumeFile); + } else { + logger.debug(`[Codex] No resume file found for snapshot codex session ${currentCodexSessionId}`); + } + } permissionHandler = new CodexPermissionHandler(session); const reasoningProcessor = new ReasoningProcessor((message) => { const envelopes = mapCodexProcessorMessageToSessionEnvelopes(message, { currentTurnId }); @@ -419,6 +733,18 @@ export async function runCodex(opts: { }); client.setPermissionHandler(permissionHandler); client.setHandler((msg) => { + const messageThreadId = typeof msg?.thread_id === 'string' && msg.thread_id.length > 0 ? msg.thread_id : null; + if (messageThreadId && (messageThreadId !== currentCodexSessionId || messageThreadId !== currentCodexConversationId)) { + currentCodexSessionId = messageThreadId; + currentCodexConversationId = messageThreadId; + logger.debug('[SessionCrypto] Updated codex identifiers', { + sessionId: session.sessionId, + codexSessionId: currentCodexSessionId, + codexConversationId: currentCodexConversationId + }); + persistSessionSnapshot(session); + } + logger.debug(`[Codex] MCP message: ${JSON.stringify(msg)}`); // Add messages to the ink UI buffer based on message type @@ -539,6 +865,40 @@ export async function runCodex(opts: { await client.connect(); logger.debug('[codex]: client.connect done'); let wasCreated = false; + if (currentCodexSessionId || currentCodexConversationId) { + client.seedSessionIdentifiers(currentCodexSessionId, currentCodexConversationId); + wasCreated = true; + resumeBootstrapPending = true; + logger.debug('[Codex] Seeded client with saved identifiers; first turn will attempt continueSession'); + messageBuffer.addMessage('Attempting to resume saved Codex thread...', 'status'); + if (client instanceof CodexAppServerClient) { + const bootstrapExecutionPolicy = resolveCodexExecutionPolicy( + currentPermissionMode ?? 'default', + client.sandboxEnabled, + ); + const bootstrapConfig: Partial = { + sandbox: bootstrapExecutionPolicy.sandbox, + 'approval-policy': bootstrapExecutionPolicy.approvalPolicy, + }; + if (currentModel) { + bootstrapConfig.model = currentModel; + } + try { + await client.resumeSavedThread( + bootstrapConfig, + { signal: abortController.signal } + ); + } catch (error) { + logger.debug('[Codex] Initial saved-thread resume bootstrap failed', error); + } finally { + resumeBootstrapPending = false; + flushBufferedResumeMessages(); + } + } else { + resumeBootstrapPending = false; + flushBufferedResumeMessages(); + } + } let currentModeHash: string | null = null; let pending: { message: string; mode: EnhancedMode; isolate: boolean; hash: string } | null = null; // If we restart (e.g., mode change), use this to carry a resume file @@ -612,10 +972,10 @@ export async function runCodex(opts: { message.mode.permissionMode, sandboxManagedByHappy, ); - - if (!wasCreated) { + const startNewSession = async () => { + const basePrompt = first ? message.message + '\n\n' + CHANGE_TITLE_INSTRUCTION : message.message; const startConfig: CodexSessionConfig = { - prompt: first ? message.message + '\n\n' + CHANGE_TITLE_INSTRUCTION : message.message, + prompt: basePrompt, sandbox: executionPolicy.sandbox, 'approval-policy': executionPolicy.approvalPolicy, config: { mcp_servers: mcpServers } @@ -623,44 +983,105 @@ export async function runCodex(opts: { if (message.mode.model) { startConfig.model = message.mode.model; } - - // Check for resume file from multiple sources + let resumeFile: string | null = null; - - // Priority 1: Explicit resume file from mode change + if (nextExperimentalResume) { resumeFile = nextExperimentalResume; - nextExperimentalResume = null; // consume once + nextExperimentalResume = null; logger.debug('[Codex] Using resume file from mode change:', resumeFile); - } - // Priority 2: Resume from stored abort session - else if (storedSessionIdForResume) { + } else if (storedSessionIdForResume) { const abortResumeFile = findCodexResumeFile(storedSessionIdForResume); if (abortResumeFile) { resumeFile = abortResumeFile; logger.debug('[Codex] Using resume file from aborted session:', resumeFile); messageBuffer.addMessage('Resuming from aborted session...', 'status'); } - storedSessionIdForResume = null; // consume once + storedSessionIdForResume = null; + } else if (bootResumeFile) { + resumeFile = bootResumeFile; + bootResumeFile = null; + logger.debug('[Codex] Using resume file from session snapshot:', resumeFile); + messageBuffer.addMessage('Resuming saved context...', 'status'); } - - // Apply resume file if found + if (resumeFile) { (startConfig.config as any).experimental_resume = resumeFile; + const resumePromptContext = buildResumePromptContext(resumeFile); + if (resumePromptContext) { + startConfig.prompt = `${resumePromptContext}\n\nCurrent user message:\n${basePrompt}`; + } } - + await client.startSession( startConfig, { signal: abortController.signal } ); wasCreated = true; first = false; + }; + + if (!wasCreated) { + await startNewSession(); } else { - const response = await client.continueSession( - message.message, - { signal: abortController.signal } - ); - logger.debug('[Codex] continueSession response:', response); + try { + const continueConfig: Partial = { + sandbox: executionPolicy.sandbox, + 'approval-policy': executionPolicy.approvalPolicy + }; + if (message.mode.model) { + continueConfig.model = message.mode.model; + } + + const response = await client.continueSession( + message.message, + { + signal: abortController.signal, + happyConfig: continueConfig + } + ); + logger.debug('[Codex] continueSession response:', response); + + const continueErrorText = (() => { + const typedResponse = response as any; + if (!typedResponse?.isError) { + return ''; + } + if (typeof typedResponse?.structuredContent?.content === 'string') { + return typedResponse.structuredContent.content; + } + if (Array.isArray(typedResponse?.content)) { + return typedResponse.content + .map((item: any) => typeof item?.text === 'string' ? item.text : '') + .filter(Boolean) + .join('\n'); + } + return ''; + })(); + + if (continueErrorText && (/session not found/i.test(continueErrorText) || /thread_id/i.test(continueErrorText))) { + throw new Error(continueErrorText); + } + + first = false; + } catch (continueError) { + const continueMessage = continueError instanceof Error + ? `${continueError.name}: ${continueError.message}` + : String(continueError); + const shouldStartNewSession = + /no active session/i.test(continueMessage) || + /not found/i.test(continueMessage) || + /invalid/i.test(continueMessage); + if (!shouldStartNewSession) { + throw continueError; + } + + logger.debug('[Codex] continueSession failed; falling back to startSession', continueError); + messageBuffer.addMessage('Resume failed, starting a new session...', 'status'); + client.clearSession(); + wasCreated = false; + await startNewSession(); + } } } catch (error) { logger.warn('Error in codex session:', error); diff --git a/packages/happy-cli/src/index.ts b/packages/happy-cli/src/index.ts index ca3c031925..d9c56fa48a 100644 --- a/packages/happy-cli/src/index.ts +++ b/packages/happy-cli/src/index.ts @@ -30,6 +30,10 @@ import { spawnHappyCLI } from './utils/spawnHappyCLI' import { claudeCliPath } from './claude/claudeLocal' import { execFileSync } from 'node:child_process' import { extractNoSandboxFlag } from './utils/sandboxFlags' +import { existsSync, readFileSync, readdirSync, statSync } from 'node:fs' +import { homedir } from 'node:os' +import { join, resolve } from 'node:path' +import { delay } from './utils/time' (async () => { @@ -100,6 +104,11 @@ import { extractNoSandboxFlag } from './utils/sandboxFlags' } else if (subcommand === 'codex') { // Handle codex command try { + if (args[1] === 'resume') { + await handleCodexResumeCommand(args.slice(2)); + return; + } + const { runCodex } = await import('@/codex/runCodex'); // Parse startedBy argument @@ -633,6 +642,7 @@ ${chalk.bold('Usage:')} happy [options] Start Claude with mobile control happy auth Manage authentication happy codex Start Codex mode + happy codex resume Resume Happy Codex session happy gemini Start Gemini mode (ACP) happy acp Start a generic ACP-compatible agent happy connect Connect AI vendor API keys @@ -723,6 +733,325 @@ ${chalk.bold.cyan('Claude Code Options (from `claude --help`):')} } })(); +function printCodexResumeHelp(): void { + console.log(` +${chalk.bold('happy codex resume')} - Resume Happy Codex session + +${chalk.bold('Usage:')} + happy codex resume --metadata-file + happy codex resume --path --pid + +${chalk.bold('Options:')} + --metadata-file Read Happy metadata JSON from file + --path Override metadata.path + --pid Override metadata.hostPid + --session-tag Use known session tag directly + --home-dir Override metadata.homeDir + --happy-home-dir Override metadata.happyHomeDir + --detach Launch restored session in background instead of attaching UI + --dry-run Resolve parameters and print without launching + -h, --help Show help + +${chalk.bold('Examples:')} + happy codex resume cmmexample --metadata-file ./metadata.json + happy codex resume cmmexample --path /Users/me/project --pid 12345 + happy codex resume cmmexample --metadata-file ./metadata.json --detach +`); +} + +function listHappyLogFiles(happyHomeDir: string): string[] { + try { + const logDir = join(happyHomeDir, 'logs') + return readdirSync(logDir) + .map((entry) => join(logDir, entry)) + .filter((fullPath) => { + try { + return statSync(fullPath).isFile() && fullPath.endsWith('.log') + } catch { + return false + } + }) + .sort((a, b) => { + try { + return statSync(b).mtimeMs - statSync(a).mtimeMs + } catch { + return 0 + } + }) + } catch { + return [] + } +} + +function extractSessionTagFromLog(logPath: string): string | null { + try { + const raw = readFileSync(logPath, 'utf8') + const matches = [...raw.matchAll(/tag: ([0-9a-f-]+)/g)] + .map((match) => match[1]) + .filter(Boolean) + return matches.length > 0 ? matches[matches.length - 1] : null + } catch { + return null + } +} + +function findLatestLogForPid(happyHomeDir: string, pid: string | null): string | null { + if (!pid) { + return null + } + const needle = `pid-${pid}.log` + return listHappyLogFiles(happyHomeDir).find((fullPath) => fullPath.includes(needle)) || null +} + +function findLatestLogForSessionId(happyHomeDir: string, sessionId: string): string | null { + const needle = `Session created/loaded: ${sessionId}` + for (const logPath of listHappyLogFiles(happyHomeDir)) { + try { + if (readFileSync(logPath, 'utf8').includes(needle)) { + return logPath + } + } catch { + } + } + return null +} + +function parseCodexResumeArgs(args: string[]) { + let sessionId: string | null = null + let metadataFile: string | null = null + let workdir: string | null = null + let pid: string | null = null + let sessionTag: string | null = null + let homeDir: string | null = null + let happyHomeDir: string | null = null + let detach = false + let dryRun = false + let showHelp = false + + for (let i = 0; i < args.length; i++) { + const arg = args[i] + if (arg === '-h' || arg === '--help') { + showHelp = true + } else if (arg === '--metadata-file') { + metadataFile = args[++i] + } else if (arg === '--path') { + workdir = args[++i] + } else if (arg === '--pid') { + pid = args[++i] + } else if (arg === '--session-tag') { + sessionTag = args[++i] + } else if (arg === '--home-dir') { + homeDir = args[++i] + } else if (arg === '--happy-home-dir') { + happyHomeDir = args[++i] + } else if (arg === '--detach') { + detach = true + } else if (arg === '--dry-run') { + dryRun = true + } else if (!sessionId) { + sessionId = arg + } else { + throw new Error(`Unknown argument for codex resume: ${arg}`) + } + } + + return { + sessionId, + metadataFile, + workdir, + pid, + sessionTag, + homeDir, + happyHomeDir, + detach, + dryRun, + showHelp + } +} + +async function resolveCodexResumeParameters(args: string[]) { + const parsed = parseCodexResumeArgs(args) + if (parsed.showHelp) { + printCodexResumeHelp() + return { exit: 0 as number | null } + } + + if (!parsed.sessionId) { + printCodexResumeHelp() + return { exit: 1 as number | null } + } + + let metadata: Record = {} + if (parsed.metadataFile) { + try { + metadata = JSON.parse(readFileSync(resolve(parsed.metadataFile), 'utf8')) + } catch (error) { + throw new Error(`Failed to read metadata file: ${error instanceof Error ? error.message : String(error)}`) + } + } + + const homeDir = parsed.homeDir || metadata.homeDir || homedir() + const happyHomeDir = parsed.happyHomeDir || metadata.happyHomeDir || join(homeDir, '.happy') + const workdir = parsed.workdir || metadata.path + const pidValue = parsed.pid || metadata.hostPid + const hostPid = pidValue === undefined || pidValue === null || pidValue === '' ? null : String(pidValue) + + const sessionSnapshotPath = join(homeDir, '.happy-session-crypto', `session-${parsed.sessionId}.json`) + let snapshot: Record | null = null + if (existsSync(sessionSnapshotPath)) { + try { + snapshot = JSON.parse(readFileSync(sessionSnapshotPath, 'utf8')) + } catch { + } + } + + const logPathFromPid = hostPid ? findLatestLogForPid(happyHomeDir, hostPid) : null + const logPath = logPathFromPid || findLatestLogForSessionId(happyHomeDir, parsed.sessionId) + const resolvedSessionTag = parsed.sessionTag || snapshot?.sessionTag || (logPath ? extractSessionTagFromLog(logPath) : null) + + if (!workdir) { + throw new Error('Could not resolve workdir. Provide --path or metadata.path') + } + if (!existsSync(workdir)) { + throw new Error(`Workdir does not exist: ${workdir}`) + } + if (!resolvedSessionTag) { + throw new Error('Could not resolve session tag. Provide --session-tag or metadata.hostPid with available logs') + } + + const tagSnapshotPath = join(homeDir, '.happy-session-crypto', `tag-${resolvedSessionTag}.json`) + if (!existsSync(sessionSnapshotPath) && !existsSync(tagSnapshotPath)) { + throw new Error(`No restore snapshot found for session ${parsed.sessionId}. Expected ${sessionSnapshotPath} or ${tagSnapshotPath}`) + } + + const settings = await readSettings() + if (metadata.machineId && settings?.machineId && metadata.machineId !== settings.machineId) { + logger.warn(`[codex resume] metadata.machineId (${metadata.machineId}) differs from local machineId (${settings.machineId})`) + } + + return { + exit: null as number | null, + sessionId: parsed.sessionId, + sessionTag: resolvedSessionTag, + workdir, + hostPid, + homeDir, + happyHomeDir, + logPath, + sessionSnapshotPath, + tagSnapshotPath, + metadata, + detach: parsed.detach, + dryRun: parsed.dryRun + } +} + +async function handleCodexResumeCommand(args: string[]): Promise { + const resolved = await resolveCodexResumeParameters(args) + if (resolved.exit !== null) { + process.exit(resolved.exit) + } + + const env = { + ...process.env, + HAPPY_RESTORE_SESSION_ID: resolved.sessionId, + HAPPY_RESTORE_SESSION_TAG: resolved.sessionTag, + HAPPY_SESSION_TAG_OVERRIDE: resolved.sessionTag + } + + const commandPreview = + `cd ${JSON.stringify(resolved.workdir)} && ` + + `HAPPY_RESTORE_SESSION_ID=${JSON.stringify(resolved.sessionId)} ` + + `HAPPY_RESTORE_SESSION_TAG=${JSON.stringify(resolved.sessionTag)} ` + + `HAPPY_SESSION_TAG_OVERRIDE=${JSON.stringify(resolved.sessionTag)} ` + + `happy codex --happy-starting-mode remote --started-by daemon` + + if (resolved.dryRun) { + console.log('Resolved Happy Codex resume parameters:') + console.log(` Session ID: ${resolved.sessionId}`) + console.log(` Session tag: ${resolved.sessionTag}`) + console.log(` Workdir: ${resolved.workdir}`) + console.log(` Home dir: ${resolved.homeDir}`) + console.log(` Happy home: ${resolved.happyHomeDir}`) + console.log(` Launch mode: ${resolved.detach ? 'detached' : 'attached'}`) + console.log(` Prior log: ${resolved.logPath || 'not found'}`) + console.log(` Session snapshot: ${resolved.sessionSnapshotPath}`) + console.log(` Tag snapshot: ${resolved.tagSnapshotPath}`) + console.log('') + console.log(commandPreview) + return + } + + try { + const sessions = await listDaemonSessions() + const existing = sessions.find((session) => session.happySessionId === resolved.sessionId) + if (existing) { + console.log(`Stopping existing session ${resolved.sessionId} (pid ${existing.pid})...`) + await stopDaemonSession(resolved.sessionId!) + await delay(500) + } + } catch { + } + + if (!resolved.detach) { + console.log('Starting restored Happy Codex session in attached mode...') + const child = spawnHappyCLI( + ['codex', '--happy-starting-mode', 'remote', '--started-by', 'daemon'], + { + cwd: resolved.workdir, + stdio: 'inherit', + env + } + ) + + const exitCode = await new Promise((resolve, reject) => { + child.once('error', reject) + child.once('exit', (code, signal) => { + if (typeof code === 'number') { + resolve(code) + return + } + if (signal) { + console.error(`Restored Happy Codex exited with signal ${signal}`) + resolve(1) + return + } + resolve(0) + }) + }) + + process.exit(exitCode) + } + + const child = spawnHappyCLI( + ['codex', '--happy-starting-mode', 'remote', '--started-by', 'daemon'], + { + cwd: resolved.workdir, + detached: true, + stdio: 'ignore', + env + } + ) + child.unref() + + if (!child.pid) { + throw new Error('Failed to spawn happy codex resume process') + } + + await delay(1200) + const newLogPath = findLatestLogForPid(resolved.happyHomeDir, String(child.pid)) + + console.log('Happy Codex resume started') + console.log(` Session ID: ${resolved.sessionId}`) + console.log(` Session tag: ${resolved.sessionTag}`) + console.log(` Workdir: ${resolved.workdir}`) + console.log(` PID: ${child.pid}`) + if (newLogPath) { + console.log(` Log: ${newLogPath}`) + } + console.log(` Command: ${commandPreview}`) +} + /** * Handle notification command