diff --git a/apps/code/src/main/services/cloud-task/schemas.ts b/apps/code/src/main/services/cloud-task/schemas.ts index b7f1a811a..84a6f532e 100644 --- a/apps/code/src/main/services/cloud-task/schemas.ts +++ b/apps/code/src/main/services/cloud-task/schemas.ts @@ -1,11 +1,13 @@ -import type { CloudTaskUpdatePayload, TaskRun } from "@shared/types"; +import { + type CloudTaskUpdatePayload, + isTerminalStatus, + type TaskRunStatus, + TERMINAL_STATUSES, +} from "@shared/types"; import { z } from "zod"; -export type { CloudTaskUpdatePayload }; - -// --- Terminal statuses --- - -export const TERMINAL_STATUSES = ["completed", "failed", "cancelled"] as const; +export type { CloudTaskUpdatePayload, TaskRunStatus }; +export { TERMINAL_STATUSES, isTerminalStatus }; // --- Events --- @@ -17,8 +19,6 @@ export interface CloudTaskEvents { [CloudTaskEvent.Update]: CloudTaskUpdatePayload; } -export type TaskRunStatus = TaskRun["status"]; - // --- tRPC Schemas --- export const watchInput = z.object({ @@ -26,7 +26,6 @@ export const watchInput = z.object({ runId: z.string(), apiHost: z.string(), teamId: z.number(), - viewing: z.boolean().optional(), }); export type WatchInput = z.infer; @@ -36,15 +35,14 @@ export const unwatchInput = z.object({ runId: z.string(), }); -export const onUpdateInput = z.object({ +export const retryInput = z.object({ taskId: z.string(), runId: z.string(), }); -export const setViewingInput = z.object({ +export const onUpdateInput = z.object({ taskId: z.string(), runId: z.string(), - viewing: z.boolean(), }); export const sendCommandInput = z.object({ diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts new file mode 100644 index 000000000..0f112dded --- /dev/null +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -0,0 +1,502 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { CloudTaskEvent } from "./schemas"; + +const mockNetFetch = vi.hoisted(() => vi.fn()); +const mockStreamFetch = vi.hoisted(() => vi.fn()); + +vi.mock("electron", () => ({ + net: { + fetch: mockNetFetch, + }, +})); + +vi.mock("../../utils/logger", () => ({ + logger: { + scope: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), + }, +})); + +import { CloudTaskService } from "./service"; + +const mockAuthService = { + authenticatedFetch: vi.fn(), +}; + +function createJsonResponse( + data: unknown, + status = 200, + headers?: Record, +): Response { + return new Response(JSON.stringify(data), { + status, + headers: { "Content-Type": "application/json", ...(headers ?? {}) }, + }); +} + +function createSseResponse(payload: string, status = 200): Response { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(payload)); + controller.close(); + }, + }); + + return new Response(stream, { + status, + headers: { "Content-Type": "text/event-stream" }, + }); +} + +function createOpenSseResponse(payload: string, status = 200): Response { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(payload)); + }, + }); + + return new Response(stream, { + status, + headers: { "Content-Type": "text/event-stream" }, + }); +} + +async function waitFor( + predicate: () => boolean, + timeoutMs = 2_000, +): Promise { + const start = Date.now(); + while (!predicate()) { + if (Date.now() - start > timeoutMs) { + throw new Error("Timed out waiting for condition"); + } + if (vi.isFakeTimers()) { + await vi.advanceTimersByTimeAsync(10); + } else { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + } +} + +describe("CloudTaskService", () => { + let service: CloudTaskService; + + beforeEach(() => { + service = new CloudTaskService(mockAuthService as never); + mockNetFetch.mockReset(); + mockStreamFetch.mockReset(); + mockAuthService.authenticatedFetch.mockReset(); + vi.stubGlobal("fetch", mockStreamFetch); + + mockAuthService.authenticatedFetch.mockImplementation( + async ( + fetchImpl: typeof fetch, + input: string | Request, + init?: RequestInit, + ) => { + return fetchImpl(input, { + ...init, + headers: { + ...(init?.headers ?? {}), + Authorization: "Bearer token", + }, + }); + }, + ); + }); + + afterEach(() => { + service.unwatchAll(); + vi.useRealTimers(); + vi.unstubAllGlobals(); + }); + + it("bootstraps paged backlog for active runs and drains deduped live SSE entries", async () => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse( + [ + { + type: "notification", + timestamp: "2026-01-01T00:00:00Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "older history", + }, + }, + }, + ], + 200, + { "X-Has-More": "true" }, + ), + ) + .mockResolvedValueOnce( + createJsonResponse( + [ + { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "hello", + }, + }, + }, + ], + 200, + { "X-Has-More": "false" }, + ), + ); + + mockStreamFetch.mockResolvedValueOnce( + createOpenSseResponse( + 'id: 1\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:01Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"hello"}}}\n\nid: 2\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:02Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"live tail"}}}\n\n', + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => updates.length >= 2); + + expect(updates).toEqual([ + { + taskId: "task-1", + runId: "run-1", + kind: "snapshot", + newEntries: [ + { + type: "notification", + timestamp: "2026-01-01T00:00:00Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "older history", + }, + }, + }, + { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "hello", + }, + }, + }, + ], + totalEntryCount: 2, + status: "in_progress", + stage: "build", + output: null, + errorMessage: null, + branch: "main", + }, + { + taskId: "task-1", + runId: "run-1", + kind: "logs", + newEntries: [ + { + type: "notification", + timestamp: "2026-01-01T00:00:02Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "live tail", + }, + }, + }, + ], + totalEntryCount: 3, + }, + ]); + + expect(mockStreamFetch).toHaveBeenCalledWith( + "https://app.example.com/api/projects/2/tasks/task-1/runs/run-1/stream/?start=latest", + expect.objectContaining({ + headers: expect.objectContaining({ + Authorization: "Bearer token", + Accept: "text/event-stream", + }), + }), + ); + }); + + it("reconnects with Last-Event-ID after a stream error", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + + mockStreamFetch + .mockResolvedValueOnce( + createSseResponse( + 'id: 1\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:01Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"hello"}}}\n\nevent: error\ndata: {"error":"boom"}\n\n', + ), + ) + .mockResolvedValueOnce( + createOpenSseResponse( + 'id: 2\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:02Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"again"}}}\n\n', + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await vi.advanceTimersByTimeAsync(2_000); + await waitFor(() => updates.length >= 2); + + expect(mockStreamFetch).toHaveBeenNthCalledWith( + 2, + "https://app.example.com/api/projects/2/tasks/task-1/runs/run-1/stream/", + expect.objectContaining({ + headers: expect.objectContaining({ + Authorization: "Bearer token", + Accept: "text/event-stream", + "Last-Event-ID": "1", + }), + }), + ); + }); + + it("emits a retryable cloud error after repeated stream failures", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const makeInProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + mockNetFetch + .mockResolvedValueOnce(makeInProgressRun()) // bootstrap: fetchTaskRun + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ) // bootstrap: fetchSessionLogs + // Each stream error triggers handleStreamCompletion → fetchTaskRun + .mockImplementation(() => Promise.resolve(makeInProgressRun())); + + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(70_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + expect(mockStreamFetch.mock.calls.length).toBe(6); + // 2 bootstrap calls + 6 handleStreamCompletion calls (one per stream error) + expect(mockNetFetch).toHaveBeenCalledTimes(8); + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud stream disconnected", + errorMessage: + "Lost connection to the cloud run stream. Retry to reconnect.", + retryable: true, + }); + }); + + it("loads paginated persisted logs once for an already terminal run", async () => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "completed", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + completed_at: "2026-01-01T00:00:00Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse( + [ + { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "done-1", + }, + }, + }, + ], + 200, + { "X-Has-More": "true" }, + ), + ) + .mockResolvedValueOnce( + createJsonResponse( + [ + { + type: "notification", + timestamp: "2026-01-01T00:00:02Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "done-2", + }, + }, + }, + ], + 200, + { "X-Has-More": "false" }, + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => updates.length >= 1); + + expect(updates).toEqual([ + { + taskId: "task-1", + runId: "run-1", + kind: "snapshot", + newEntries: [ + { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "done-1", + }, + }, + }, + { + type: "notification", + timestamp: "2026-01-01T00:00:02Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "done-2", + }, + }, + }, + ], + totalEntryCount: 2, + status: "completed", + stage: "build", + output: null, + errorMessage: null, + branch: "main", + }, + ]); + expect(mockNetFetch).toHaveBeenCalledTimes(3); + }); +}); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index a01b24792..3eeb2072c 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -8,18 +8,45 @@ import type { AuthService } from "../auth/service"; import { CloudTaskEvent, type CloudTaskEvents, + isTerminalStatus, type SendCommandInput, type SendCommandOutput, type TaskRunStatus, - TERMINAL_STATUSES, type WatchInput, } from "./schemas"; +import { type SseEvent, SseEventParser } from "./sse-parser"; const log = logger.scope("cloud-task"); -const LOG_POLL_INTERVAL_MS = 500; -const STATUS_POLL_INTERVAL_MS = 60_000; -const STATUS_POLL_INTERVAL_VIEWING_MS = 3_000; +const MAX_SSE_RECONNECT_ATTEMPTS = 5; +const SSE_RECONNECT_BASE_DELAY_MS = 2_000; +const SSE_RECONNECT_MAX_DELAY_MS = 30_000; +const EVENT_BATCH_FLUSH_MS = 16; +const EVENT_BATCH_MAX_SIZE = 50; +const SESSION_LOG_PAGE_LIMIT = 5_000; + +interface SessionLogsPage { + entries: StoredLogEntry[]; + hasMore: boolean; +} + +interface CloudTaskConnectionError { + title: string; + message: string; + retryable: boolean; + autoRetry?: boolean; +} + +class CloudTaskStreamError extends Error { + constructor( + message: string, + public readonly details: CloudTaskConnectionError, + public readonly status?: number, + ) { + super(message); + this.name = "CloudTaskStreamError"; + } +} interface TaskRunResponse { id: string; @@ -28,6 +55,19 @@ interface TaskRunResponse { output?: Record | null; error_message?: string | null; branch?: string | null; + updated_at?: string; + completed_at?: string | null; +} + +interface TaskRunStateEvent { + type: "task_run_state"; + status?: TaskRunStatus; + stage?: string | null; + output?: Record | null; + error_message?: string | null; + branch?: string | null; + updated_at?: string | null; + completed_at?: string | null; } interface WatcherState { @@ -35,24 +75,103 @@ interface WatcherState { runId: string; apiHost: string; teamId: number; - pollTimeoutId: ReturnType | null; - processedLogCount: number; - lastLogCursor: string | null; - lastCursorSeenCount: number; + subscriberCount: number; + sseAbortController: AbortController | null; + reconnectTimeoutId: ReturnType | null; + batchFlushTimeoutId: ReturnType | null; + pendingLogEntries: StoredLogEntry[]; + totalEntryCount: number; + reconnectAttempts: number; + lastEventId: string | null; lastStatus: TaskRunStatus | null; lastStage: string | null; lastOutput: Record | null; lastErrorMessage: string | null; lastBranch: string | null; - lastStatusPollTime: number; - subscriberCount: number; - viewing: boolean; + lastStatusUpdatedAt: string | null; + isBootstrapping: boolean; + hasEmittedSnapshot: boolean; + bufferedLogBatches: StoredLogEntry[][]; + failed: boolean; + needsPostBootstrapReconnect: boolean; + needsStopAfterBootstrap: boolean; } function watcherKey(taskId: string, runId: string): string { return `${taskId}:${runId}`; } +function isTaskRunStateEvent(data: unknown): data is TaskRunStateEvent { + return ( + typeof data === "object" && + data !== null && + (data as { type?: string }).type === "task_run_state" + ); +} + +function createStreamStatusError(status: number): CloudTaskStreamError { + switch (status) { + case 401: + return new CloudTaskStreamError( + "Cloud authentication expired", + { + title: "Cloud authentication expired", + message: "Please reauthenticate and retry the cloud run stream.", + retryable: true, + autoRetry: false, + }, + status, + ); + case 403: + return new CloudTaskStreamError( + "Cloud access denied", + { + title: "Cloud access denied", + message: + "You no longer have access to this cloud run. Reauthenticate and retry.", + retryable: true, + autoRetry: false, + }, + status, + ); + case 404: + return new CloudTaskStreamError( + "Cloud run not found", + { + title: "Cloud run not found", + message: + "This cloud run could not be found. It may have been deleted or moved.", + retryable: false, + autoRetry: false, + }, + status, + ); + case 406: + return new CloudTaskStreamError( + "Cloud stream unavailable", + { + title: "Cloud stream unavailable", + message: + "The backend rejected the live stream request. Restart the backend and retry.", + retryable: true, + autoRetry: false, + }, + status, + ); + default: + return new CloudTaskStreamError( + `Stream request failed with status ${status}`, + { + title: "Cloud stream failed", + message: `The cloud stream request failed with status ${status}. Retry to reconnect.`, + retryable: true, + autoRetry: true, + }, + status, + ); + } +} + @injectable() export class CloudTaskService extends TypedEventEmitter { private watchers = new Map(); @@ -67,13 +186,9 @@ export class CloudTaskService extends TypedEventEmitter { watch(input: WatchInput): void { const key = watcherKey(input.taskId, input.runId); - // If watcher already exists, increment subscriber count const existing = this.watchers.get(key); if (existing) { existing.subscriberCount++; - if (input.viewing && !existing.viewing) { - this.setViewing(input.taskId, input.runId, true); - } log.info("Cloud task watcher subscriber added", { key, subscribers: existing.subscriberCount, @@ -102,25 +217,45 @@ export class CloudTaskService extends TypedEventEmitter { } } - setViewing(taskId: string, runId: string, viewing: boolean): void { + retry(taskId: string, runId: string): void { const key = watcherKey(taskId, runId); const watcher = this.watchers.get(key); if (!watcher) return; - if (watcher.viewing === viewing) return; - watcher.viewing = viewing; + if (watcher.reconnectTimeoutId) { + clearTimeout(watcher.reconnectTimeoutId); + watcher.reconnectTimeoutId = null; + } + + watcher.sseAbortController?.abort(); + watcher.sseAbortController = null; - if (watcher.pollTimeoutId) { - clearTimeout(watcher.pollTimeoutId); - watcher.pollTimeoutId = null; + if (watcher.batchFlushTimeoutId) { + clearTimeout(watcher.batchFlushTimeoutId); + watcher.batchFlushTimeoutId = null; } - if (viewing) { - this.poll(key, true); - } else { - this.schedulePoll(key); + + watcher.reconnectAttempts = 0; + watcher.failed = false; + watcher.pendingLogEntries = []; + watcher.bufferedLogBatches = []; + watcher.needsPostBootstrapReconnect = false; + watcher.needsStopAfterBootstrap = false; + + log.info("Retrying cloud task watcher", { + key, + hasSnapshot: watcher.hasEmittedSnapshot, + }); + + if (!watcher.hasEmittedSnapshot) { + watcher.lastEventId = null; + watcher.totalEntryCount = 0; + watcher.isBootstrapping = false; + void this.bootstrapWatcher(key); + return; } - log.info("Cloud task watcher viewing changed", { key, viewing }); + void this.connectSse(key, { startLatest: !watcher.lastEventId }); } async sendCommand(input: SendCommandInput): Promise { @@ -212,8 +347,6 @@ export class CloudTaskService extends TypedEventEmitter { } } - // --- Private --- - private startWatcher(input: WatchInput, subscriberCount: number): void { const key = watcherKey(input.taskId, input.runId); @@ -222,165 +355,299 @@ export class CloudTaskService extends TypedEventEmitter { runId: input.runId, apiHost: input.apiHost, teamId: input.teamId, - pollTimeoutId: null, - processedLogCount: 0, - lastLogCursor: null, - lastCursorSeenCount: 0, + subscriberCount, + sseAbortController: null, + reconnectTimeoutId: null, + batchFlushTimeoutId: null, + pendingLogEntries: [], + totalEntryCount: 0, + reconnectAttempts: 0, + lastEventId: null, lastStatus: null, lastStage: null, lastOutput: null, lastErrorMessage: null, lastBranch: null, - lastStatusPollTime: 0, - subscriberCount, - viewing: input.viewing ?? false, + lastStatusUpdatedAt: null, + isBootstrapping: false, + hasEmittedSnapshot: false, + bufferedLogBatches: [], + failed: false, + needsPostBootstrapReconnect: false, + needsStopAfterBootstrap: false, }; this.watchers.set(key, watcher); log.info("Cloud task watcher started", { key }); - - // Immediate first poll (snapshot) - this.poll(key, true); + void this.bootstrapWatcher(key); } private stopWatcher(key: string): void { const watcher = this.watchers.get(key); if (!watcher) return; - if (watcher.pollTimeoutId) { - clearTimeout(watcher.pollTimeoutId); - watcher.pollTimeoutId = null; + watcher.sseAbortController?.abort(); + + if (watcher.reconnectTimeoutId) { + clearTimeout(watcher.reconnectTimeoutId); + watcher.reconnectTimeoutId = null; } + if (watcher.batchFlushTimeoutId) { + clearTimeout(watcher.batchFlushTimeoutId); + watcher.batchFlushTimeoutId = null; + } + + this.flushLogBatch(key); this.watchers.delete(key); log.info("Cloud task watcher stopped", { key }); } - private schedulePoll(key: string): void { + private async bootstrapWatcher(key: string): Promise { const watcher = this.watchers.get(key); if (!watcher) return; - const interval = watcher.viewing - ? LOG_POLL_INTERVAL_MS - : STATUS_POLL_INTERVAL_MS; + watcher.failed = false; + watcher.needsPostBootstrapReconnect = false; + watcher.needsStopAfterBootstrap = false; + + const run = await this.fetchTaskRun(watcher); + const currentWatcher = this.watchers.get(key); + if (!currentWatcher || currentWatcher !== watcher) return; + + if (!run) { + this.failWatcher(key, { + title: "Failed to load cloud run", + message: "Could not fetch the cloud run state. Retry to reconnect.", + retryable: true, + }); + return; + } + + this.applyTaskRunState(watcher, run); + + if (isTerminalStatus(run.status)) { + const historicalEntries = await this.fetchAllSessionLogs(watcher); + const terminalWatcher = this.watchers.get(key); + if (!terminalWatcher || terminalWatcher !== watcher) return; + if (watcher.failed) return; + if (!historicalEntries) { + this.failWatcher(key, { + title: "Failed to load task history", + message: + "Could not load the persisted cloud task logs. Retry to reconnect.", + retryable: true, + }); + return; + } - watcher.pollTimeoutId = setTimeout(() => { - watcher.pollTimeoutId = null; - this.poll(key, false); - }, interval); + watcher.totalEntryCount = historicalEntries.length; + watcher.hasEmittedSnapshot = true; + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "snapshot", + newEntries: historicalEntries, + totalEntryCount: watcher.totalEntryCount, + status: watcher.lastStatus ?? undefined, + stage: watcher.lastStage, + output: watcher.lastOutput, + errorMessage: watcher.lastErrorMessage, + branch: watcher.lastBranch, + }); + this.stopWatcher(key); + return; + } + + watcher.isBootstrapping = true; + watcher.bufferedLogBatches = []; + void this.connectSse(key, { startLatest: true }); + + const historicalEntries = await this.fetchAllSessionLogs(watcher); + const bootstrappingWatcher = this.watchers.get(key); + if (!bootstrappingWatcher || bootstrappingWatcher !== watcher) return; + if (watcher.failed) return; + if (!historicalEntries) { + this.failWatcher(key, { + title: "Failed to load cloud run history", + message: + "Could not load the existing cloud run logs. Retry to reconnect.", + retryable: true, + }); + return; + } + + // Flush any pending live entries into the bootstrap buffer before snapshot. + this.flushLogBatch(key); + + watcher.totalEntryCount = historicalEntries.length; + watcher.hasEmittedSnapshot = true; + + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "snapshot", + newEntries: historicalEntries, + totalEntryCount: watcher.totalEntryCount, + status: watcher.lastStatus ?? undefined, + stage: watcher.lastStage, + output: watcher.lastOutput, + errorMessage: watcher.lastErrorMessage, + branch: watcher.lastBranch, + }); + + watcher.isBootstrapping = false; + this.drainBufferedLogBatches(key, historicalEntries); + + if (watcher.failed) { + return; + } + + if ( + watcher.needsStopAfterBootstrap || + isTerminalStatus(watcher.lastStatus) + ) { + watcher.needsStopAfterBootstrap = false; + this.stopWatcher(key); + return; + } + + if (watcher.needsPostBootstrapReconnect) { + watcher.needsPostBootstrapReconnect = false; + this.scheduleReconnect(key); + } } - private async poll(key: string, isSnapshot: boolean): Promise { + private async connectSse( + key: string, + options?: { startLatest?: boolean }, + ): Promise { const watcher = this.watchers.get(key); if (!watcher) return; + const controller = new AbortController(); + watcher.sseAbortController = controller; + + const url = new URL( + `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream/`, + ); + if (options?.startLatest && !watcher.lastEventId) { + url.searchParams.set("start", "latest"); + } + const headers: Record = { + Accept: "text/event-stream", + }; + if (watcher.lastEventId) { + headers["Last-Event-ID"] = watcher.lastEventId; + } + + const parser = new SseEventParser(); + const decoder = new TextDecoder(); + try { - // Only fetch logs when the user is viewing the run - const logResult = watcher.viewing - ? await this.fetchLogs(watcher) - : { newEntries: [] as StoredLogEntry[] }; - - // Fetch status if snapshot or interval elapsed - const now = Date.now(); - const statusInterval = watcher.viewing - ? STATUS_POLL_INTERVAL_VIEWING_MS - : STATUS_POLL_INTERVAL_MS; - const shouldFetchStatus = - isSnapshot || now - watcher.lastStatusPollTime >= statusInterval; - - let statusResult: TaskRunResponse | null = null; - let statusChanged = false; - - if (shouldFetchStatus) { - statusResult = await this.fetchRunStatus(watcher); - watcher.lastStatusPollTime = now; - - if (statusResult) { - statusChanged = - statusResult.status !== watcher.lastStatus || - statusResult.stage !== watcher.lastStage || - JSON.stringify(statusResult.output) !== - JSON.stringify(watcher.lastOutput) || - statusResult.error_message !== watcher.lastErrorMessage || - statusResult.branch !== watcher.lastBranch; - - if (statusChanged) { - watcher.lastStatus = statusResult.status; - watcher.lastStage = statusResult.stage ?? null; - watcher.lastOutput = statusResult.output ?? null; - watcher.lastErrorMessage = statusResult.error_message ?? null; - watcher.lastBranch = statusResult.branch ?? null; - } - } + const response = await this.authService.authenticatedFetch( + fetch, + url.toString(), + { + method: "GET", + headers, + signal: controller.signal, + }, + ); + + if (!response.ok) { + throw createStreamStatusError(response.status); } - // Determine kind and whether to emit - const hasNewLogs = logResult.newEntries.length > 0; - const hasStatusUpdate = statusChanged && statusResult; + if (!response.body) { + throw new Error("Stream response did not include a body"); + } - if (isSnapshot) { - // Always emit snapshot on first poll, even if empty - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "snapshot", - newEntries: logResult.newEntries, - totalEntryCount: watcher.processedLogCount, - status: statusResult?.status ?? watcher.lastStatus ?? undefined, - stage: statusResult?.stage ?? watcher.lastStage, - output: statusResult?.output ?? watcher.lastOutput, - errorMessage: statusResult?.error_message ?? watcher.lastErrorMessage, - branch: statusResult?.branch ?? watcher.lastBranch, - }); - } else { - if (hasNewLogs && hasStatusUpdate && statusResult) { - // Both changed — emit snapshot - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "snapshot", - newEntries: logResult.newEntries, - totalEntryCount: watcher.processedLogCount, - status: statusResult.status, - stage: statusResult.stage ?? null, - output: statusResult.output ?? null, - errorMessage: statusResult.error_message ?? null, - branch: statusResult.branch ?? null, - }); - } else if (hasNewLogs) { - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "logs", - newEntries: logResult.newEntries, - totalEntryCount: watcher.processedLogCount, - }); - } else if (hasStatusUpdate && statusResult) { - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "status", - status: statusResult.status, - stage: statusResult.stage ?? null, - output: statusResult.output ?? null, - errorMessage: statusResult.error_message ?? null, - branch: statusResult.branch ?? null, - }); + const reader = response.body.getReader(); + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + if (!value) { + continue; + } + + const chunk = decoder.decode(value, { stream: true }); + const events = parser.parse(chunk); + for (const event of events) { + this.handleSseEvent(key, event); } } - // Check for terminal status - const currentStatus = watcher.lastStatus; + const trailingEvents = parser.parse(decoder.decode()); + for (const event of trailingEvents) { + this.handleSseEvent(key, event); + } + + this.flushLogBatch(key); + + if (controller.signal.aborted) { + return; + } + + await this.handleStreamCompletion(key); + } catch (error) { + this.flushLogBatch(key); + + if (controller.signal.aborted) { + return; + } + if ( - currentStatus && - TERMINAL_STATUSES.includes( - currentStatus as (typeof TERMINAL_STATUSES)[number], - ) + error instanceof CloudTaskStreamError && + error.details.autoRetry === false ) { - // The regular poll above already fetched logs and emitted any updates. - // Only emit a final status event if we did NOT already emit a status or - // snapshot update above, to ensure the renderer knows the run is terminal. - if (!hasStatusUpdate && !isSnapshot) { + this.failWatcher(key, error.details); + return; + } + + const errorMessage = + error instanceof Error ? error.message : "Unknown stream error"; + log.warn("Cloud task stream error", { + key, + error: errorMessage, + }); + await this.handleStreamCompletion(key); + } finally { + const currentWatcher = this.watchers.get(key); + if (currentWatcher?.sseAbortController === controller) { + currentWatcher.sseAbortController = null; + } + } + } + + private handleSseEvent(key: string, event: SseEvent): void { + const watcher = this.watchers.get(key); + if (!watcher || watcher.failed) return; + + if (event.id) { + watcher.lastEventId = event.id; + } + + if (event.event === "error") { + const message = + typeof event.data === "object" && + event.data !== null && + "error" in event.data && + typeof (event.data as { error?: unknown }).error === "string" + ? (event.data as { error: string }).error + : "Unknown stream error"; + throw new Error(message); + } + + watcher.reconnectAttempts = 0; + + if (isTaskRunStateEvent(event.data)) { + if (this.applyTaskRunState(watcher, event.data)) { + if (!watcher.isBootstrapping && !isTerminalStatus(watcher.lastStatus)) { this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId, runId: watcher.runId, @@ -392,34 +659,291 @@ export class CloudTaskService extends TypedEventEmitter { branch: watcher.lastBranch, }); } + } + return; + } - log.info("Cloud task reached terminal status", { - key, - status: currentStatus, - }); - this.stopWatcher(key); + watcher.pendingLogEntries.push(event.data as StoredLogEntry); + if (watcher.pendingLogEntries.length >= EVENT_BATCH_MAX_SIZE) { + this.flushLogBatch(key); + return; + } + + if (!watcher.batchFlushTimeoutId) { + watcher.batchFlushTimeoutId = setTimeout(() => { + watcher.batchFlushTimeoutId = null; + this.flushLogBatch(key); + }, EVENT_BATCH_FLUSH_MS); + } + } + + private flushLogBatch(key: string): void { + const watcher = this.watchers.get(key); + if (!watcher || watcher.pendingLogEntries.length === 0) return; + + if (watcher.batchFlushTimeoutId) { + clearTimeout(watcher.batchFlushTimeoutId); + watcher.batchFlushTimeoutId = null; + } + + const entries = watcher.pendingLogEntries; + watcher.pendingLogEntries = []; + + if (watcher.isBootstrapping) { + watcher.bufferedLogBatches.push(entries); + return; + } + + watcher.totalEntryCount += entries.length; + + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "logs", + newEntries: entries, + totalEntryCount: watcher.totalEntryCount, + }); + } + + private drainBufferedLogBatches( + key: string, + historicalEntries: StoredLogEntry[], + ): void { + const watcher = this.watchers.get(key); + if (!watcher || watcher.bufferedLogBatches.length === 0) return; + + const historicalCounts = new Map(); + for (const entry of historicalEntries) { + const serialized = JSON.stringify(entry); + historicalCounts.set( + serialized, + (historicalCounts.get(serialized) ?? 0) + 1, + ); + } + + for (const entries of watcher.bufferedLogBatches) { + const dedupedEntries = entries.filter((entry) => { + const serialized = JSON.stringify(entry); + const remaining = historicalCounts.get(serialized) ?? 0; + if (remaining <= 0) { + return true; + } + + historicalCounts.set(serialized, remaining - 1); + return false; + }); + + if (dedupedEntries.length === 0) { + continue; + } + + watcher.totalEntryCount += dedupedEntries.length; + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "logs", + newEntries: dedupedEntries, + totalEntryCount: watcher.totalEntryCount, + }); + } + + watcher.bufferedLogBatches = []; + } + + private failWatcher(key: string, error: CloudTaskConnectionError): void { + const watcher = this.watchers.get(key); + if (!watcher) return; + + watcher.failed = true; + watcher.isBootstrapping = false; + watcher.pendingLogEntries = []; + watcher.bufferedLogBatches = []; + + if (watcher.reconnectTimeoutId) { + clearTimeout(watcher.reconnectTimeoutId); + watcher.reconnectTimeoutId = null; + } + + if (watcher.batchFlushTimeoutId) { + clearTimeout(watcher.batchFlushTimeoutId); + watcher.batchFlushTimeoutId = null; + } + + watcher.sseAbortController?.abort(); + watcher.sseAbortController = null; + + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "error", + errorTitle: error.title, + errorMessage: error.message, + retryable: error.retryable, + }); + } + + private scheduleReconnect(key: string, error?: unknown): void { + const watcher = this.watchers.get(key); + if (!watcher || watcher.failed || isTerminalStatus(watcher.lastStatus)) { + return; + } + + if (watcher.reconnectTimeoutId) { + clearTimeout(watcher.reconnectTimeoutId); + } + + watcher.reconnectAttempts += 1; + if (watcher.reconnectAttempts > MAX_SSE_RECONNECT_ATTEMPTS) { + const details = + error instanceof CloudTaskStreamError + ? error.details + : { + title: "Cloud stream disconnected", + message: + "Lost connection to the cloud run stream. Retry to reconnect.", + retryable: true, + }; + this.failWatcher(key, details); + return; + } + + const delay = Math.min( + SSE_RECONNECT_BASE_DELAY_MS * 2 ** (watcher.reconnectAttempts - 1), + SSE_RECONNECT_MAX_DELAY_MS, + ); + + watcher.reconnectTimeoutId = setTimeout(() => { + const currentWatcher = this.watchers.get(key); + if (!currentWatcher) return; + currentWatcher.reconnectTimeoutId = null; + void this.connectSse(key, { + startLatest: + currentWatcher.isBootstrapping || currentWatcher.hasEmittedSnapshot, + }); + }, delay); + } + + private async handleStreamCompletion(key: string): Promise { + const watcher = this.watchers.get(key); + if (!watcher) return; + + const run = await this.fetchTaskRun(watcher); + const currentWatcher = this.watchers.get(key); + if (!currentWatcher || currentWatcher !== watcher) return; + + if (watcher.isBootstrapping) { + if (!run) { + watcher.needsPostBootstrapReconnect = true; return; } - } catch (error) { - log.warn("Cloud task poll error", { key, error }); + + this.applyTaskRunState(watcher, run); + if (isTerminalStatus(watcher.lastStatus)) { + watcher.needsStopAfterBootstrap = true; + } else { + watcher.needsPostBootstrapReconnect = true; + } + return; } - // Schedule next poll (only if watcher still exists) - if (this.watchers.has(key)) { - this.schedulePoll(key); + if (!run) { + this.scheduleReconnect( + key, + new CloudTaskStreamError("Failed to fetch terminal cloud run state", { + title: "Cloud run state unavailable", + message: + "Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.", + retryable: true, + }), + ); + return; } + + this.applyTaskRunState(watcher, run); + + if (!isTerminalStatus(watcher.lastStatus)) { + log.warn("Cloud task stream ended before terminal status", { + key, + status: watcher.lastStatus, + }); + this.scheduleReconnect(key); + return; + } + + // Always emit terminal status — processEvent intentionally skips the emit + // for terminal states (to avoid acting on it before the stream fully ends), + // so this is the single place that notifies the renderer of completion. + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "status", + status: watcher.lastStatus ?? undefined, + stage: watcher.lastStage, + output: watcher.lastOutput, + errorMessage: watcher.lastErrorMessage, + branch: watcher.lastBranch, + }); + + this.stopWatcher(key); + } + + private applyTaskRunState( + watcher: WatcherState, + run: + | Pick< + TaskRunResponse, + | "status" + | "stage" + | "output" + | "error_message" + | "branch" + | "updated_at" + > + | TaskRunStateEvent, + ): boolean { + const updatedAt = run.updated_at ?? null; + if ( + updatedAt && + watcher.lastStatusUpdatedAt && + Date.parse(updatedAt) <= Date.parse(watcher.lastStatusUpdatedAt) + ) { + return false; + } + + const nextStatus = run.status ?? watcher.lastStatus; + const nextStage = run.stage ?? null; + const nextOutput = run.output ?? null; + const nextErrorMessage = run.error_message ?? null; + const nextBranch = run.branch ?? null; + + const changed = + nextStatus !== watcher.lastStatus || + nextStage !== watcher.lastStage || + JSON.stringify(nextOutput) !== JSON.stringify(watcher.lastOutput) || + nextErrorMessage !== watcher.lastErrorMessage || + nextBranch !== watcher.lastBranch; + + watcher.lastStatus = nextStatus ?? null; + watcher.lastStage = nextStage; + watcher.lastOutput = nextOutput; + watcher.lastErrorMessage = nextErrorMessage; + watcher.lastBranch = nextBranch; + if (updatedAt) { + watcher.lastStatusUpdatedAt = updatedAt; + } + + return changed; } - private async fetchLogs( + private async fetchSessionLogsPage( watcher: WatcherState, - ): Promise<{ newEntries: StoredLogEntry[] }> { + offset: number, + ): Promise { const url = new URL( `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/session_logs/`, ); - url.searchParams.set("limit", "5000"); - if (watcher.lastLogCursor) { - url.searchParams.set("after", watcher.lastLogCursor); - } + url.searchParams.set("limit", SESSION_LOG_PAGE_LIMIT.toString()); + url.searchParams.set("offset", offset.toString()); try { const authedResponse = await this.authService.authenticatedFetch( @@ -431,87 +955,55 @@ export class CloudTaskService extends TypedEventEmitter { ); if (!authedResponse.ok) { - log.warn("Cloud task log fetch failed", { + log.warn("Cloud task session logs fetch failed", { status: authedResponse.status, taskId: watcher.taskId, + runId: watcher.runId, + offset, }); - return { newEntries: [] }; + return null; } const raw = await authedResponse.text(); - const entries = JSON.parse(raw) as StoredLogEntry[]; - - if (entries.length === 0) { - return { newEntries: [] }; - } - - // Dedupe: skip entries we've already seen (guard against non-unique cursors) - const startIndex = this.findDedupeStartIndex(entries, watcher); - const newEntries = entries.slice(startIndex); - - if (newEntries.length > 0) { - watcher.processedLogCount += newEntries.length; - // Update cursor to last entry's timestamp - const lastEntry = newEntries[newEntries.length - 1]; - const lastTimestamp = lastEntry?.timestamp; - if (lastTimestamp) { - if (lastTimestamp === watcher.lastLogCursor) { - watcher.lastCursorSeenCount += newEntries.filter( - (entry) => entry.timestamp === lastTimestamp, - ).length; - } else { - watcher.lastLogCursor = lastTimestamp; - watcher.lastCursorSeenCount = newEntries.filter( - (entry) => entry.timestamp === lastTimestamp, - ).length; - } - } - } - - return { newEntries }; + return { + entries: JSON.parse(raw) as StoredLogEntry[], + hasMore: authedResponse.headers.get("X-Has-More") === "true", + }; } catch (error) { - log.warn("Cloud task log fetch error", { + log.warn("Cloud task session logs fetch error", { taskId: watcher.taskId, + runId: watcher.runId, + offset, error, }); - return { newEntries: [] }; + return null; } } - private findDedupeStartIndex( - entries: StoredLogEntry[], + private async fetchAllSessionLogs( watcher: WatcherState, - ): number { - // If no cursor, all entries are new - if (!watcher.lastLogCursor) return 0; + ): Promise { + const entries: StoredLogEntry[] = []; + let offset = 0; - let seenAtCursor = 0; - - // Skip entries before cursor, then skip already-seen entries at cursor - for (let i = 0; i < entries.length; i++) { - const ts = entries[i]?.timestamp; - if (!ts) { - return i; + while (true) { + const page = await this.fetchSessionLogsPage(watcher, offset); + if (!page) { + return null; } - if (ts < watcher.lastLogCursor) { - continue; + for (const entry of page.entries) { + entries.push(entry); } - - if (ts === watcher.lastLogCursor) { - seenAtCursor++; - if (seenAtCursor <= watcher.lastCursorSeenCount) { - continue; - } + if (!page.hasMore || page.entries.length === 0) { + return entries; } - return i; + offset += page.entries.length; } - // All entries are at or before cursor — nothing new - return entries.length; } - private async fetchRunStatus( + private async fetchTaskRun( watcher: WatcherState, ): Promise { const url = `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/`; @@ -529,6 +1021,7 @@ export class CloudTaskService extends TypedEventEmitter { log.warn("Cloud task status fetch failed", { status: authedResponse.status, taskId: watcher.taskId, + runId: watcher.runId, }); return null; } @@ -537,6 +1030,7 @@ export class CloudTaskService extends TypedEventEmitter { } catch (error) { log.warn("Cloud task status fetch error", { taskId: watcher.taskId, + runId: watcher.runId, error, }); return null; diff --git a/apps/code/src/main/services/cloud-task/sse-parser.test.ts b/apps/code/src/main/services/cloud-task/sse-parser.test.ts new file mode 100644 index 000000000..df7fdd341 --- /dev/null +++ b/apps/code/src/main/services/cloud-task/sse-parser.test.ts @@ -0,0 +1,37 @@ +import { describe, expect, it } from "vitest"; +import { SseEventParser } from "./sse-parser"; + +describe("SseEventParser", () => { + it("parses event ids and data", () => { + const parser = new SseEventParser(); + const events = parser.parse('id: 42\ndata: {"hello":"world"}\n\n'); + + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ + event: undefined, + id: "42", + data: { hello: "world" }, + }); + }); + + it("parses named SSE events", () => { + const parser = new SseEventParser(); + const events = parser.parse('event: error\ndata: {"error":"boom"}\n\n'); + + expect(events).toHaveLength(1); + expect(events[0].event).toBe("error"); + expect(events[0].data).toEqual({ error: "boom" }); + }); + + it("handles chunked input", () => { + const parser = new SseEventParser(); + + expect(parser.parse("id: 1\n")).toEqual([]); + expect(parser.parse('data: {"part":')).toEqual([]); + const events = parser.parse("true}\n\n"); + + expect(events).toHaveLength(1); + expect(events[0].id).toBe("1"); + expect(events[0].data).toEqual({ part: true }); + }); +}); diff --git a/apps/code/src/main/services/cloud-task/sse-parser.ts b/apps/code/src/main/services/cloud-task/sse-parser.ts new file mode 100644 index 000000000..6b6db6723 --- /dev/null +++ b/apps/code/src/main/services/cloud-task/sse-parser.ts @@ -0,0 +1,84 @@ +export interface SseEvent { + event?: string; + id?: string; + data: unknown; +} + +export class SseEventParser { + private buffer = ""; + private currentEventName: string | null = null; + private currentEventId: string | null = null; + private currentData: string[] = []; + + parse(chunk: string): SseEvent[] { + this.buffer += chunk; + const lines = this.buffer.split("\n"); + this.buffer = lines.pop() || ""; + + const events: SseEvent[] = []; + + for (const rawLine of lines) { + const line = rawLine.endsWith("\r") ? rawLine.slice(0, -1) : rawLine; + + if (line === "") { + const event = this.flushEvent(); + if (event) { + events.push(event); + } + continue; + } + + if (line.startsWith(":")) { + continue; + } + + if (line.startsWith("event:")) { + this.currentEventName = line.slice(6).trim() || null; + continue; + } + + if (line.startsWith("id:")) { + this.currentEventId = line.slice(3).trim() || null; + continue; + } + + if (line.startsWith("data:")) { + this.currentData.push(line.slice(5).trimStart()); + } + } + + return events; + } + + reset(): void { + this.buffer = ""; + this.currentEventName = null; + this.currentEventId = null; + this.currentData = []; + } + + private flushEvent(): SseEvent | null { + if (this.currentData.length === 0) { + this.currentEventName = null; + this.currentEventId = null; + return null; + } + + const rawData = this.currentData.join("\n"); + this.currentData = []; + + try { + const data = JSON.parse(rawData); + return { + event: this.currentEventName ?? undefined, + id: this.currentEventId ?? undefined, + data, + }; + } catch { + return null; + } finally { + this.currentEventName = null; + this.currentEventId = null; + } + } +} diff --git a/apps/code/src/main/trpc/routers/cloud-task.ts b/apps/code/src/main/trpc/routers/cloud-task.ts index 7d82366ab..5f44fbb12 100644 --- a/apps/code/src/main/trpc/routers/cloud-task.ts +++ b/apps/code/src/main/trpc/routers/cloud-task.ts @@ -3,9 +3,9 @@ import { MAIN_TOKENS } from "../../di/tokens"; import { CloudTaskEvent, onUpdateInput, + retryInput, sendCommandInput, sendCommandOutput, - setViewingInput, unwatchInput, watchInput, } from "../../services/cloud-task/schemas"; @@ -24,11 +24,9 @@ export const cloudTaskRouter = router({ .input(unwatchInput) .mutation(({ input }) => getService().unwatch(input.taskId, input.runId)), - setViewing: publicProcedure - .input(setViewingInput) - .mutation(({ input }) => - getService().setViewing(input.taskId, input.runId, input.viewing), - ), + retry: publicProcedure + .input(retryInput) + .mutation(({ input }) => getService().retry(input.taskId, input.runId)), sendCommand: publicProcedure .input(sendCommandInput) diff --git a/apps/code/src/renderer/features/code-review/components/ReviewShell.tsx b/apps/code/src/renderer/features/code-review/components/ReviewShell.tsx index 060d20a72..bbd37b15b 100644 --- a/apps/code/src/renderer/features/code-review/components/ReviewShell.tsx +++ b/apps/code/src/renderer/features/code-review/components/ReviewShell.tsx @@ -542,7 +542,7 @@ export function DiffFileHeader({ fileDiff.prevName && fileDiff.prevName !== fileDiff.name ? `${fileDiff.prevName} \u2192 ${fileDiff.name}` : fileDiff.name; - const { dirPath, fileName } = splitFilePath(fullPath); + const { dirPath, fileName } = splitFilePath(fullPath ?? ""); const { additions, deletions } = sumHunkStats(fileDiff.hunks); return ( diff --git a/apps/code/src/renderer/features/command-center/components/CommandCenterSessionView.tsx b/apps/code/src/renderer/features/command-center/components/CommandCenterSessionView.tsx index f63efba5c..e518b2487 100644 --- a/apps/code/src/renderer/features/command-center/components/CommandCenterSessionView.tsx +++ b/apps/code/src/renderer/features/command-center/components/CommandCenterSessionView.tsx @@ -64,7 +64,7 @@ export function CommandCenterSessionView({ cloudBranch={cloudBranch} hasError={hasError} errorTitle={errorTitle} - errorMessage={errorMessage} + errorMessage={errorMessage ?? undefined} onRetry={isCloud ? undefined : handleRetry} onNewSession={isCloud ? undefined : handleNewSession} isInitializing={isInitializing} diff --git a/apps/code/src/renderer/features/inbox/components/detail/ReportTaskLogs.tsx b/apps/code/src/renderer/features/inbox/components/detail/ReportTaskLogs.tsx index 0b30c846f..69b836032 100644 --- a/apps/code/src/renderer/features/inbox/components/detail/ReportTaskLogs.tsx +++ b/apps/code/src/renderer/features/inbox/components/detail/ReportTaskLogs.tsx @@ -33,7 +33,7 @@ function getTaskStatusSummary(task: Task): { } { const status = task.latest_run?.status; switch (status) { - case "started": + case "queued": case "in_progress": return { label: task.latest_run?.stage diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts index ab82416ef..167aa6ab0 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts @@ -86,8 +86,13 @@ export function useSessionCallbacks({ }, [taskId, setPendingContent, requestFocus]); const handleRetry = useCallback(async () => { - if (!repoPath) return; try { + if (sessionRef.current?.isCloud) { + await getSessionService().retryCloudTaskWatch(taskId); + return; + } + + if (!repoPath) return; await getSessionService().clearSessionError(taskId, repoPath); } catch (error) { log.error("Failed to clear session error", error); diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts index f8afc734d..5960f3939 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts @@ -1,5 +1,7 @@ +import { useAuthStateValue } from "@features/auth/hooks/authQueries"; import { useConnectivity } from "@hooks/useConnectivity"; import { trpcClient } from "@renderer/trpc/client"; +import { getCloudUrlFromRegion } from "@shared/constants/oauth"; import type { Task } from "@shared/types"; import { useQueryClient } from "@tanstack/react-query"; import { logger } from "@utils/logger"; @@ -32,6 +34,7 @@ export function useSessionConnection({ }: UseSessionConnectionOptions) { const queryClient = useQueryClient(); const { isOnline } = useConnectivity(); + const cloudAuthState = useAuthStateValue((state) => state); useChatTitleGenerator(taskId); @@ -64,17 +67,31 @@ export function useSessionConnection({ useEffect(() => { if (!isCloud || !task.latest_run?.id) return; + if (cloudAuthState.status !== "authenticated") return; + if (!cloudAuthState.bootstrapComplete) return; + if (!cloudAuthState.projectId || !cloudAuthState.cloudRegion) return; + const runId = task.latest_run.id; const cleanup = getSessionService().watchCloudTask( task.id, runId, + getCloudUrlFromRegion(cloudAuthState.cloudRegion), + cloudAuthState.projectId, () => { queryClient.invalidateQueries({ queryKey: ["tasks"] }); }, - true, ); return cleanup; - }, [isCloud, task.id, task.latest_run?.id, queryClient]); + }, [ + cloudAuthState.bootstrapComplete, + cloudAuthState.cloudRegion, + cloudAuthState.projectId, + cloudAuthState.status, + isCloud, + queryClient, + task.id, + task.latest_run?.id, + ]); useEffect(() => { if (!repoPath) return; diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionViewState.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionViewState.ts index 1cd5eee13..49302ecb4 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionViewState.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionViewState.ts @@ -14,13 +14,11 @@ export function useSessionViewState(taskId: string, task: Task) { const cloudStatus = session?.cloudStatus ?? null; const isCloudRunNotTerminal = isCloud && - (!cloudStatus || - cloudStatus === "started" || - cloudStatus === "in_progress"); + (!cloudStatus || cloudStatus === "queued" || cloudStatus === "in_progress"); const isCloudRunTerminal = isCloud && !isCloudRunNotTerminal; - const isRunning = isCloud ? true : session?.status === "connected"; - const hasError = isCloud ? false : session?.status === "error"; + const hasError = session?.status === "error"; + const isRunning = isCloud ? !hasError : session?.status === "connected"; const events = session?.events ?? []; const isPromptPending = session?.isPromptPending ?? false; @@ -30,7 +28,7 @@ export function useSessionViewState(taskId: string, task: Task) { !task.latest_run?.id && !!task.description; const isResumingExistingSession = !!task.latest_run?.id; const isInitializing = isCloud - ? !session || (events.length === 0 && isCloudRunNotTerminal) + ? !hasError && (!session || (events.length === 0 && isCloudRunNotTerminal)) : !session || (session.status === "connecting" && events.length === 0) || (session.status === "connected" && @@ -57,7 +55,9 @@ export function useSessionViewState(taskId: string, task: Task) { promptStartedAt, isInitializing, cloudBranch, - errorTitle: isCloud ? undefined : session?.errorTitle, - errorMessage: isCloud ? undefined : session?.errorMessage, + errorTitle: session?.errorTitle, + errorMessage: + session?.errorMessage ?? + (isCloud ? session?.cloudErrorMessage : undefined), }; } diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 368be1700..29d64ff5e 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -32,6 +32,10 @@ const mockTrpcLogs = vi.hoisted(() => ({ const mockTrpcCloudTask = vi.hoisted(() => ({ sendCommand: { mutate: vi.fn() }, + watch: { mutate: vi.fn().mockResolvedValue(undefined) }, + retry: { mutate: vi.fn().mockResolvedValue(undefined) }, + unwatch: { mutate: vi.fn().mockResolvedValue(undefined) }, + onUpdate: { subscribe: vi.fn() }, })); vi.mock("@renderer/trpc/client", () => ({ @@ -259,6 +263,19 @@ describe("SessionService", () => { mockGetIsOnline.mockReturnValue(true); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(undefined); mockSessionStoreSetters.getSessions.mockReturnValue({}); + mockAuth.fetchAuthState.mockResolvedValue({ + status: "authenticated", + bootstrapComplete: true, + cloudRegion: "us", + projectId: 123, + availableProjectIds: [123], + availableOrgIds: [], + hasCodeAccess: true, + needsScopeReauth: false, + }); + mockTrpcCloudTask.onUpdate.subscribe.mockReturnValue({ + unsubscribe: vi.fn(), + }); }); describe("singleton management", () => { @@ -439,6 +456,145 @@ describe("SessionService", () => { }); }); + describe("watchCloudTask", () => { + it("subscribes to cloud updates before starting the watcher", async () => { + const service = getSessionService(); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + expect(mockTrpcCloudTask.onUpdate.subscribe).toHaveBeenCalledWith( + { taskId: "task-123", runId: "run-123" }, + expect.objectContaining({ + onData: expect.any(Function), + onError: expect.any(Function), + }), + ); + + expect(mockTrpcCloudTask.watch.mutate).toHaveBeenCalledWith({ + taskId: "task-123", + runId: "run-123", + apiHost: "https://api.anthropic.com", + teamId: 123, + }); + + expect( + mockTrpcCloudTask.onUpdate.subscribe.mock.invocationCallOrder[0], + ).toBeLessThan( + mockTrpcCloudTask.watch.mutate.mock.invocationCallOrder[0], + ); + }); + + it("ignores stale async starts when the same watcher is replaced", async () => { + const service = getSessionService(); + let resolveFirstWatchStart!: () => void; + let resolveSecondWatchStart!: () => void; + + mockTrpcCloudTask.watch.mutate + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveFirstWatchStart = resolve; + }), + ) + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveSecondWatchStart = resolve; + }), + ); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + service.stopCloudTaskWatch("task-123"); + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + resolveSecondWatchStart(); + await Promise.resolve(); + await Promise.resolve(); + + resolveFirstWatchStart(); + await Promise.resolve(); + await Promise.resolve(); + + expect(mockTrpcCloudTask.watch.mutate).toHaveBeenCalledTimes(2); + }); + + it("sends a compensating unwatch if teardown wins the race after watch starts", async () => { + const service = getSessionService(); + let resolveWatchStart!: () => void; + mockTrpcCloudTask.unwatch.mutate.mockClear(); + + mockTrpcCloudTask.watch.mutate.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveWatchStart = resolve; + }), + ); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + service.stopCloudTaskWatch("task-123"); + expect(mockTrpcCloudTask.unwatch.mutate).toHaveBeenCalledTimes(1); + + resolveWatchStart(); + await Promise.resolve(); + await Promise.resolve(); + + expect(mockTrpcCloudTask.unwatch.mutate).toHaveBeenCalledTimes(2); + expect(mockTrpcCloudTask.unwatch.mutate).toHaveBeenLastCalledWith({ + taskId: "task-123", + runId: "run-123", + }); + }); + + it("retries an errored cloud watcher in place", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue({ + ...createMockSession({ + taskId: "task-123", + taskRunId: "run-123", + status: "error", + }), + isCloud: true, + }); + + await service.retryCloudTaskWatch("task-123"); + + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ + status: "disconnected", + errorTitle: undefined, + errorMessage: undefined, + isPromptPending: false, + }), + ); + expect(mockTrpcCloudTask.retry.mutate).toHaveBeenCalledWith({ + taskId: "task-123", + runId: "run-123", + }); + }); + }); + describe("reset", () => { it("clears connecting tasks", () => { const service = getSessionService(); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index bf0a2b211..4604658d5 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -43,6 +43,7 @@ import { type EffortLevel, type ExecutionMode, effortLevelSchema, + isTerminalStatus, type Task, } from "@shared/types"; import { ANALYTICS_EVENTS } from "@shared/types/analytics"; @@ -68,7 +69,6 @@ import { } from "@utils/session"; const log = logger.scope("session-service"); -const TERMINAL_CLOUD_STATUSES = new Set(["completed", "failed", "cancelled"]); interface AuthCredentials { apiHost: string; @@ -112,6 +112,7 @@ export function resetSessionService(): void { export class SessionService { private connectingTasks = new Map>(); + private nextCloudTaskWatchToken = 0; private subscriptions = new Map< string, { @@ -124,6 +125,9 @@ export class SessionService { string, { runId: string; + apiHost: string; + teamId: number; + startToken: number; subscription: { unsubscribe: () => void }; onStatusChange?: () => void; } @@ -1181,18 +1185,17 @@ export class SessionService { prompt: string | ContentBlock[], options?: { skipQueueGuard?: boolean }, ): Promise<{ stopReason: string }> { - if ( - session.cloudStatus && - TERMINAL_CLOUD_STATUSES.has(session.cloudStatus) - ) { - return this.resumeCloudRun(session, prompt); + const rawPromptText = extractPromptText(prompt); + if (!rawPromptText.trim()) { + return { stopReason: "empty" }; + } + + if (isTerminalStatus(session.cloudStatus)) { + return this.resumeCloudRun(session, rawPromptText); } if (!options?.skipQueueGuard && session.isPromptPending) { - sessionStoreSetters.enqueueMessage( - session.taskId, - typeof prompt === "string" ? prompt : extractPromptText(prompt), - ); + sessionStoreSetters.enqueueMessage(session.taskId, rawPromptText); log.info("Cloud message queued", { taskId: session.taskId, queueLength: session.messageQueue.length + 1, @@ -1341,6 +1344,10 @@ export class SessionService { if (!client) { throw new Error("Authentication required for cloud commands"); } + const auth = await this.getCloudCommandAuth(); + if (!auth) { + throw new Error("Authentication required for cloud commands"); + } const { blocks, promptText } = await this.prepareCloudPrompt(prompt); @@ -1427,7 +1434,7 @@ export class SessionService { // in run state (pending_user_message), NOT via user_message command. // Start the watcher immediately so we don't miss status updates. - this.watchCloudTask(session.taskId, newRun.id); + this.watchCloudTask(session.taskId, newRun.id, auth.apiHost, auth.teamId); // Invalidate task queries so the UI picks up the new run metadata queryClient.invalidateQueries({ queryKey: ["tasks"] }); @@ -1443,10 +1450,7 @@ export class SessionService { } private async cancelCloudPrompt(session: AgentSession): Promise { - if ( - session.cloudStatus && - TERMINAL_CLOUD_STATUSES.has(session.cloudStatus) - ) { + if (isTerminalStatus(session.cloudStatus)) { log.info("Skipping cancel for terminal cloud run", { taskId: session.taskId, status: session.cloudStatus, @@ -1885,35 +1889,31 @@ export class SessionService { /** * Start watching a cloud task via main-process CloudTaskService. * - * The watcher stays alive across navigation. On navigate-away the caller - * invokes the returned cleanup which only toggles viewing off (background - * status polling continues at 60s). On navigate-back this method detects - * the existing watcher, toggles viewing back on, and returns a new cleanup. - * - * A fresh watcher is created only on first visit or when the runId changes - * (new run started). Terminal status triggers full teardown from within - * handleCloudTaskUpdate via stopCloudTaskWatch(). + * The watcher stays alive across navigation. A fresh watcher is created only + * on first visit or when the runId changes (new run started). Terminal + * status triggers full teardown from within handleCloudTaskUpdate via + * stopCloudTaskWatch(). */ watchCloudTask( taskId: string, runId: string, + apiHost: string, + teamId: number, onStatusChange?: () => void, - viewing?: boolean, ): () => void { const taskRunId = runId; + const startToken = ++this.nextCloudTaskWatchToken; const existingWatcher = this.cloudTaskWatchers.get(taskId); - // Resuming same run — just toggle viewing back on - if (existingWatcher && existingWatcher.runId === runId) { + // Resuming same run — reuse the existing watcher. + if ( + existingWatcher && + existingWatcher.runId === runId && + existingWatcher.apiHost === apiHost && + existingWatcher.teamId === teamId + ) { existingWatcher.onStatusChange = onStatusChange; - trpcClient.cloudTask.setViewing - .mutate({ taskId, runId, viewing: viewing ?? true }) - .catch(() => {}); - return () => { - trpcClient.cloudTask.setViewing - .mutate({ taskId, runId, viewing: false }) - .catch(() => {}); - }; + return () => {}; } // Different run — full cleanup of old watcher first @@ -1935,26 +1935,8 @@ export class SessionService { }); } - void fetchAuthState() - .then((authState) => { - if (!authState.projectId || !authState.cloudRegion) { - log.warn("No auth for cloud task watcher", { taskId }); - return; - } - - return trpcClient.cloudTask.watch.mutate({ - taskId, - runId, - apiHost: getCloudUrlFromRegion(authState.cloudRegion), - teamId: authState.projectId, - viewing, - }); - }) - .catch((err: unknown) => - log.warn("Failed to start cloud task watcher", { taskId, err }), - ); - - // Subscribe to updates + // Subscribe before starting the main-process watcher so the first replayed + // SSE/log burst cannot race ahead of the renderer subscription. const subscription = trpcClient.cloudTask.onUpdate.subscribe( { taskId, runId }, { @@ -1962,7 +1944,9 @@ export class SessionService { this.handleCloudTaskUpdate(taskRunId, update); const watcher = this.cloudTaskWatchers.get(taskId); if ( - (update.kind === "status" || update.kind === "snapshot") && + (update.kind === "status" || + update.kind === "snapshot" || + update.kind === "error") && watcher?.onStatusChange ) { watcher.onStatusChange(); @@ -1975,15 +1959,50 @@ export class SessionService { this.cloudTaskWatchers.set(taskId, { runId, + apiHost, + teamId, + startToken, subscription, onStatusChange, }); - return () => { - trpcClient.cloudTask.setViewing - .mutate({ taskId, runId, viewing: false }) - .catch(() => {}); - }; + // Start main-process watcher after the subscription is attached. + void (async () => { + try { + if (!this.isCurrentCloudTaskWatcher(taskId, runId, startToken)) { + return; + } + + await trpcClient.cloudTask.watch.mutate({ + taskId, + runId, + apiHost, + teamId, + }); + + // If the local watcher was torn down while the watch request was in + // flight, send a compensating unwatch after the start request lands. + if (!this.isCurrentCloudTaskWatcher(taskId, runId, startToken)) { + await trpcClient.cloudTask.unwatch.mutate({ taskId, runId }); + } + } catch (err: unknown) { + if (!this.isCurrentCloudTaskWatcher(taskId, runId, startToken)) { + return; + } + log.warn("Failed to start cloud task watcher", { taskId, err }); + } + })(); + + return () => {}; + } + + private isCurrentCloudTaskWatcher( + taskId: string, + runId: string, + startToken: number, + ): boolean { + const watcher = this.cloudTaskWatchers.get(taskId); + return watcher?.runId === runId && watcher.startToken === startToken; } /** @@ -2003,6 +2022,37 @@ export class SessionService { ); } + async retryCloudTaskWatch(taskId: string): Promise { + const session = sessionStoreSetters.getSessionByTaskId(taskId); + if (!session?.isCloud) { + throw new Error("No active cloud session for task"); + } + + const previousErrorTitle = session.errorTitle; + const previousErrorMessage = session.errorMessage; + + sessionStoreSetters.updateSession(session.taskRunId, { + status: "disconnected", + errorTitle: undefined, + errorMessage: undefined, + isPromptPending: false, + }); + + try { + await trpcClient.cloudTask.retry.mutate({ + taskId, + runId: session.taskRunId, + }); + } catch (error) { + sessionStoreSetters.updateSession(session.taskRunId, { + status: "error", + errorTitle: previousErrorTitle, + errorMessage: previousErrorMessage, + }); + throw error; + } + } + public updateSessionTaskTitle(taskId: string, taskTitle: string): void { const session = sessionStoreSetters.getSessionByTaskId(taskId); if (!session) return; @@ -2016,6 +2066,18 @@ export class SessionService { taskRunId: string, update: CloudTaskUpdatePayload, ): void { + if (update.kind === "error") { + sessionStoreSetters.updateSession(taskRunId, { + status: "error", + errorTitle: update.errorTitle, + errorMessage: + update.errorMessage ?? + "Lost connection to the cloud run. Retry to reconnect.", + isPromptPending: false, + }); + return; + } + // Append new log entries with dedup guard if (update.newEntries && update.newEntries.length > 0) { const session = sessionStoreSetters.getSessions()[taskRunId]; @@ -2060,7 +2122,7 @@ export class SessionService { } } - // Flush queued messages when a cloud turn completes (detected via log polling) + // Flush queued messages when a cloud turn completes (detected via live log updates) const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; if ( sessionAfterLogs && @@ -2097,12 +2159,13 @@ export class SessionService { }); this.sendQueuedCloudMessages(session.taskId).catch(() => { // Retries exhausted — message was re-enqueued by - // sendQueuedCloudMessages, poll-based flush will keep trying + // sendQueuedCloudMessages, future stream-based completion detection + // will keep trying }); } } - if (update.status && TERMINAL_CLOUD_STATUSES.has(update.status)) { + if (isTerminalStatus(update.status)) { // Clean up any pending resume messages that couldn't be sent const session = sessionStoreSetters.getSessions()[taskRunId]; if ( diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 882b018bb..38808cb6c 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -5,7 +5,7 @@ import type { SessionConfigSelectOption, SessionConfigSelectOptions, } from "@agentclientprotocol/sdk"; -import type { ExecutionMode } from "@shared/types"; +import type { ExecutionMode, TaskRunStatus } from "@shared/types"; import type { AcpMessage } from "@shared/types/session-events"; import { create } from "zustand"; import { immer } from "zustand/middleware/immer"; @@ -22,12 +22,7 @@ export interface QueuedMessage { queuedAt: number; } -export type TaskRunStatus = - | "started" - | "in_progress" - | "completed" - | "failed" - | "cancelled"; +export type { TaskRunStatus }; export type OptimisticItem = { type: "user_message"; diff --git a/apps/code/src/renderer/features/sidebar/components/items/TaskItem.tsx b/apps/code/src/renderer/features/sidebar/components/items/TaskItem.tsx index 11477222d..9af268cf5 100644 --- a/apps/code/src/renderer/features/sidebar/components/items/TaskItem.tsx +++ b/apps/code/src/renderer/features/sidebar/components/items/TaskItem.tsx @@ -11,6 +11,7 @@ import { Pause, PushPin, } from "@phosphor-icons/react"; +import type { TaskRunStatus } from "@shared/types"; import { selectIsFocusedOnWorktree, useFocusStore } from "@stores/focusStore"; import { useCallback, useEffect, useRef, useState } from "react"; import { SidebarItem } from "../SidebarItem"; @@ -27,12 +28,7 @@ interface TaskItemProps { isPinned?: boolean; isSuspended?: boolean; needsPermission?: boolean; - taskRunStatus?: - | "started" - | "in_progress" - | "completed" - | "failed" - | "cancelled"; + taskRunStatus?: TaskRunStatus; timestamp?: number; isEditing?: boolean; onClick: () => void; @@ -135,7 +131,7 @@ function CloudStatusIcon({ }: { taskRunStatus?: TaskItemProps["taskRunStatus"]; }) { - if (taskRunStatus === "started" || taskRunStatus === "in_progress") { + if (taskRunStatus === "queued" || taskRunStatus === "in_progress") { return ( @@ -203,6 +199,12 @@ export function TaskItem({ const isWorktreeTask = workspaceMode === "worktree"; const isCloudTask = workspaceMode === "cloud"; + const isTerminalCloud = + isCloudTask && + (taskRunStatus === "completed" || + taskRunStatus === "failed" || + taskRunStatus === "cancelled"); + const icon = isSuspended ? ( @@ -211,16 +213,18 @@ export function TaskItem({ ) : needsPermission ? ( + ) : isTerminalCloud ? ( + ) : isGenerating ? ( + ) : isCloudTask ? ( + ) : isUnread ? ( ) : isPinned ? ( - ) : isCloudTask ? ( - ) : isWorktreeTask ? ( isFocused ? ( diff --git a/apps/code/src/renderer/features/sidebar/hooks/useSidebarData.ts b/apps/code/src/renderer/features/sidebar/hooks/useSidebarData.ts index 24a582062..cf8d21cab 100644 --- a/apps/code/src/renderer/features/sidebar/hooks/useSidebarData.ts +++ b/apps/code/src/renderer/features/sidebar/hooks/useSidebarData.ts @@ -4,7 +4,7 @@ import { useSuspendedTaskIds } from "@features/suspension/hooks/useSuspendedTask import { useTasks } from "@features/tasks/hooks/useTasks"; import { useWorkspaces } from "@features/workspace/hooks/useWorkspace"; import { getTaskRepository, parseRepository } from "@renderer/utils/repository"; -import type { Task } from "@shared/types"; +import type { Task, TaskRunStatus } from "@shared/types"; import { useEffect, useMemo, useRef } from "react"; import { useSidebarStore } from "../stores/sidebarStore"; import type { SortMode } from "../types"; @@ -28,12 +28,7 @@ export interface TaskData { repository: TaskRepositoryInfo | null; isSuspended: boolean; folderId?: string; - taskRunStatus?: - | "started" - | "in_progress" - | "completed" - | "failed" - | "cancelled"; + taskRunStatus?: TaskRunStatus; taskRunEnvironment?: "local" | "cloud"; } @@ -220,7 +215,7 @@ export function useSidebarData({ needsPermission: (session?.pendingPermissions?.size ?? 0) > 0, repository: getRepositoryInfo(task, workspace?.folderPath), folderId: workspace?.folderId || undefined, - taskRunStatus: task.latest_run?.status, + taskRunStatus: session?.cloudStatus ?? task.latest_run?.status, taskRunEnvironment: task.latest_run?.environment, }; }); diff --git a/apps/code/src/renderer/features/task-detail/components/TaskLogsPanel.tsx b/apps/code/src/renderer/features/task-detail/components/TaskLogsPanel.tsx index 19ab7b458..1347c7347 100644 --- a/apps/code/src/renderer/features/task-detail/components/TaskLogsPanel.tsx +++ b/apps/code/src/renderer/features/task-detail/components/TaskLogsPanel.tsx @@ -155,9 +155,9 @@ export function TaskLogsPanel({ taskId, task, hideInput }: TaskLogsPanelProps) { cloudDiffStats={cloudDiffStats} hasError={hasError} errorTitle={errorTitle} - errorMessage={errorMessage} + errorMessage={errorMessage ?? undefined} hideInput={hideInput} - onRetry={isCloud ? undefined : handleRetry} + onRetry={handleRetry} onNewSession={isCloud ? undefined : handleNewSession} isInitializing={isInitializing} slackThreadUrl={slackThreadUrl} diff --git a/apps/code/src/renderer/features/task-detail/hooks/useCloudRunState.ts b/apps/code/src/renderer/features/task-detail/hooks/useCloudRunState.ts index eec0f2c63..d4e84466c 100644 --- a/apps/code/src/renderer/features/task-detail/hooks/useCloudRunState.ts +++ b/apps/code/src/renderer/features/task-detail/hooks/useCloudRunState.ts @@ -26,7 +26,7 @@ export function useCloudRunState(taskId: string, task: Task) { const cloudStatus = session?.cloudStatus ?? freshTask.latest_run?.status ?? null; const isRunActive = - cloudStatus === "started" || + cloudStatus === "queued" || cloudStatus === "in_progress" || (cloudStatus === null && session != null); diff --git a/apps/code/src/renderer/features/tasks/stores/taskStore.types.ts b/apps/code/src/renderer/features/tasks/stores/taskStore.types.ts index 6e7ca98b4..25e55cfd9 100644 --- a/apps/code/src/renderer/features/tasks/stores/taskStore.types.ts +++ b/apps/code/src/renderer/features/tasks/stores/taskStore.types.ts @@ -36,7 +36,7 @@ export type FilterMatchMode = "all" | "any"; export const TASK_STATUS_ORDER: string[] = [ "failed", "in_progress", - "started", + "queued", "completed", "backlog", ]; diff --git a/apps/code/src/renderer/sagas/task/task-creation.test.ts b/apps/code/src/renderer/sagas/task/task-creation.test.ts index aa256cea1..6fe85fc75 100644 --- a/apps/code/src/renderer/sagas/task/task-creation.test.ts +++ b/apps/code/src/renderer/sagas/task/task-creation.test.ts @@ -94,7 +94,7 @@ const createRun = (overrides: Partial = {}): TaskRun => ({ team: 1, branch: "release/remembered-branch", environment: "cloud", - status: "started", + status: "queued", log_url: "https://example.com/logs/run-123", error_message: null, output: null, diff --git a/apps/code/src/shared/types.ts b/apps/code/src/shared/types.ts index dcfb4513c..e65fd9812 100644 --- a/apps/code/src/shared/types.ts +++ b/apps/code/src/shared/types.ts @@ -45,6 +45,26 @@ export interface Task { latest_run?: TaskRun; } +export type TaskRunStatus = + | "not_started" + | "queued" + | "in_progress" + | "completed" + | "failed" + | "cancelled"; + +export const TERMINAL_STATUSES = ["completed", "failed", "cancelled"] as const; + +export function isTerminalStatus( + status: TaskRunStatus | string | null | undefined, +): boolean { + return ( + status !== null && + status !== undefined && + TERMINAL_STATUSES.includes(status as (typeof TERMINAL_STATUSES)[number]) + ); +} + export interface TaskRun { id: string; task: string; // Task ID @@ -52,7 +72,7 @@ export interface TaskRun { branch: string | null; stage?: string | null; // Current stage (e.g., 'research', 'plan', 'build') environment?: "local" | "cloud"; - status: "started" | "in_progress" | "completed" | "failed" | "cancelled"; + status: TaskRunStatus; log_url: string; error_message: string | null; output: Record | null; // Structured output (PR URL, commit SHA, etc.) @@ -89,7 +109,7 @@ export interface SandboxEnvironmentInput { private?: boolean; } -export type CloudTaskUpdateKind = "logs" | "status" | "snapshot"; +export type CloudTaskUpdateKind = "logs" | "status" | "snapshot" | "error"; export interface CloudTaskUpdatePayload { taskId: string; @@ -104,6 +124,9 @@ export interface CloudTaskUpdatePayload { output?: Record | null; errorMessage?: string | null; branch?: string | null; + // Connection/error fields (present when kind is "error") + errorTitle?: string; + retryable?: boolean; } // Mention types for editors