Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,108 @@ export namespace MessageV2 {
return result
}

// ── Lightweight conversation loading ──────────────────────────────────
//
// filterCompactedLazy avoids materializing the full WithParts[] array.
// Phase 1: scan message *info only* (no parts) newest→oldest to find
// the compaction boundary and collect message IDs.
// Phase 2: load parts only for messages after the boundary.
//
// For a 7,000-message session with no compaction this still loads all
// parts, but for compacted sessions it skips everything before the
// summary — which is the common case for long-running sessions.

/** Scan info-only (no parts) newest→oldest. Returns message rows from
* the compaction boundary forward, in oldest-first order. */
async function scanBoundary(sessionID: SessionID) {
const size = 50
let before: string | undefined
const rows: (typeof MessageTable.$inferSelect)[] = []
const completed = new Set<string>()

while (true) {
const cursor_before = before ? cursor.decode(before) : undefined
const where = cursor_before
? and(eq(MessageTable.session_id, sessionID), older(cursor_before))
: eq(MessageTable.session_id, sessionID)
const batch = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(where)
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
.limit(size + 1)
.all(),
)
if (batch.length === 0) break
const more = batch.length > size
const page = more ? batch.slice(0, size) : batch

let found = false
for (const row of page) {
rows.push(row)
const msg = info(row)
if (
msg.role === "assistant" &&
(msg as Assistant).summary &&
(msg as Assistant).finish &&
!(msg as Assistant).error
)
completed.add(msg.parentID)
if (msg.role === "user" && completed.has(msg.id)) {
// Potential boundary — need to check parts for compaction type.
// Only load parts for THIS message to check.
const partRows = Database.use((db) =>
db.select().from(PartTable).where(eq(PartTable.message_id, row.id)).all(),
)
if (partRows.some((p) => (p.data as any).type === "compaction")) {
found = true
break
}
}
}
if (found || !more) break
const tail = page.at(-1)!
before = cursor.encode({ id: tail.id, time: tail.time_created })
}
rows.reverse()
return rows
}

/** Load conversation from compaction boundary forward, with full parts.
* For compacted sessions: two-pass (info scan → selective hydrate) is
* much cheaper. For uncompacted sessions: falls back to the original
* single-pass filterCompacted(stream()) to avoid the extra info scan. */
export async function filterCompactedLazy(sessionID: SessionID) {
// Quick probe: check newest 50 message infos for any compaction summary.
// One DB query, no parts loaded.
const probe = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(eq(MessageTable.session_id, sessionID))
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
.limit(50)
.all(),
)
const compacted = probe.some((row) => {
const msg = info(row)
return (
msg.role === "assistant" && (msg as Assistant).summary && (msg as Assistant).finish && !(msg as Assistant).error
)
})
if (!compacted) {
// No recent compaction summary — fall back to single-pass which
// loads parts alongside info (avoids 155+ wasted info-only queries
// for uncompacted sessions).
return filterCompacted(stream(sessionID))
}
// Compacted session: two-pass is efficient — scan info to find boundary,
// then hydrate only messages after it.
const rows = await scanBoundary(sessionID)
return hydrate(rows)
}

export function fromError(e: unknown, ctx: { providerID: ProviderID }): NonNullable<Assistant["error"]> {
switch (true) {
case e instanceof DOMException && e.name === "AbortError":
Expand Down
45 changes: 43 additions & 2 deletions packages/opencode/src/session/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,20 @@ export namespace SessionPrompt {

let step = 0
const session = await Session.get(sessionID)
// filterCompactedLazy scans message info without loading parts to find
// the compaction boundary, then hydrates parts only for messages after
// it. For a 7K-message session with compaction at message #100, this
// loads ~100 messages' parts instead of all 7K.
let msgs = await MessageV2.filterCompactedLazy(sessionID)
let needsFullReload = false
while (true) {
if (needsFullReload) {
msgs = await MessageV2.filterCompactedLazy(sessionID)
needsFullReload = false
}
SessionStatus.set(sessionID, { type: "busy" })
log.info("loop", { step, sessionID })
if (abort.aborted) break
let msgs = await MessageV2.filterCompacted(MessageV2.stream(sessionID))

let lastUser: MessageV2.User | undefined
let lastAssistant: MessageV2.Assistant | undefined
Expand Down Expand Up @@ -525,6 +534,7 @@ export namespace SessionPrompt {
} satisfies MessageV2.TextPart)
}

needsFullReload = true
continue
}

Expand All @@ -539,6 +549,7 @@ export namespace SessionPrompt {
overflow: task.overflow,
})
if (result === "stop") break
needsFullReload = true
continue
}

Expand All @@ -554,6 +565,7 @@ export namespace SessionPrompt {
model: lastUser.model,
auto: true,
})
needsFullReload = true
continue
}

Expand Down Expand Up @@ -663,6 +675,24 @@ export namespace SessionPrompt {
system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
}

// Context-window windowing: only convert messages that fit in the
// LLM context window to ModelMessage format. This avoids creating
// ~300MB of wrapper objects for messages the provider will discard.
const budget = (model.limit.input || model.limit.context || 200_000) * 4 // chars
let used = 0
let windowStart = msgs.length
for (let i = msgs.length - 1; i >= 0; i--) {
for (const part of msgs[i].parts) {
if (part.type === "text") used += part.text.length
else if (part.type === "tool" && part.state.status === "completed")
used += (part.state.output?.length ?? 0) + JSON.stringify(part.state.input).length
else if (part.type === "reasoning") used += part.text.length
}
if (used > budget) break
windowStart = i
}
const window = windowStart > 0 ? msgs.slice(windowStart) : msgs

const result = await processor.process({
user: lastUser,
agent,
Expand All @@ -671,7 +701,7 @@ export namespace SessionPrompt {
sessionID,
system,
messages: [
...MessageV2.toModelMessages(msgs, model),
...MessageV2.toModelMessages(window, model),
...(isLastStep
? [
{
Expand Down Expand Up @@ -719,6 +749,17 @@ export namespace SessionPrompt {
auto: true,
overflow: !processor.message.finish,
})
needsFullReload = true
} else {
// Normal tool-call continuation: fetch the latest page to pick up
// new assistant messages and tool results, then merge with the
// cached history to avoid reloading the entire conversation.
const fresh = await MessageV2.page({ sessionID, limit: 200 })
const existing = new Map(msgs.map((m) => [m.info.id, m]))
for (const msg of fresh.items) existing.set(msg.info.id, msg)
msgs = Array.from(existing.values()).sort((a, b) =>
a.info.id < b.info.id ? -1 : a.info.id > b.info.id ? 1 : 0,
)
}
continue
}
Expand Down
Loading