Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,20 @@ jobs:
fail-fast: false
with:
os: ${{ matrix.os }}

xNuts-plugin-agent:
needs: linux-unit-tests
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
command:
- 'yarn test:nuts'
uses: salesforcecli/github-workflows/.github/workflows/externalNut.yml@main
with:
packageName: '@salesforce/agents'
externalProjectGitUrl: 'https://github.com/salesforcecli/plugin-agent'
command: ${{matrix.command}}
os: ${{matrix.os}}
useCache: false
secrets: inherit
26 changes: 25 additions & 1 deletion src/agents/agentBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { readFile, readdir, cp, mkdir } from 'node:fs/promises';
import { join } from 'node:path';
import { Connection, SfError } from '@salesforce/core';
import { AgentPreviewInterface, type AgentPreviewSendResponse, type PlannerResponse, PreviewMetadata } from '../types';
import { getHistoryDir, TranscriptEntry } from '../utils';
import { getHistoryDir, SessionHistoryBuffer, TranscriptEntry } from '../utils';

/**
* Abstract base class for agent preview functionality.
Expand All @@ -30,6 +30,8 @@ export abstract class AgentBase {
public name: string | undefined;
protected sessionId: string | undefined;
protected historyDir: string | undefined;
protected historyBuffer: SessionHistoryBuffer | undefined;
protected turnCounter = 0;
protected apexDebugging: boolean | undefined;
protected planIds = new Set<string>();
public abstract preview: AgentPreviewInterface;
Expand All @@ -52,6 +54,28 @@ export abstract class AgentBase {
return getHistoryDir(await this.getAgentIdForStorage(), this.sessionId);
}

/**
* Resume an existing session by loading session state from disk
*
* @param sessionId The session ID to resume
*/
public async resumeSession(sessionId: string): Promise<void> {
this.sessionId = sessionId;
const agentId = await this.getAgentIdForStorage();
this.historyDir = await getHistoryDir(agentId, sessionId);

// Load session data from disk
const { buffer, turnCount } = await SessionHistoryBuffer.fromDisk(this.historyDir, sessionId, agentId);
this.historyBuffer = buffer;
this.turnCounter = turnCount;

// Load planIds from buffer
const history = await this.getHistoryFromDisc();
if (history.metadata?.planIds) {
history.metadata.planIds.forEach((planId) => this.planIds.add(planId));
}
}

/**
* Get all traces from the current session
* Reads traces from the session directory if available, otherwise fetches from API
Expand Down
124 changes: 64 additions & 60 deletions src/agents/productionAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@ import {
} from '../types';
import { MaybeMock } from '../maybe-mock';
import {
appendTranscriptToHistory,
writeMetaFileToHistory,
updateMetadataEndTime,
writeTraceToHistory,
requestWithEndpointFallback,
getHistoryDir,
getAllHistory,
TranscriptEntry,
logSessionToIndex,
getAgentIndexDir,
logTurnToHistory,
recordTraceForTurn,
SessionHistoryBuffer,
} from '../utils';
import { createTraceFlag, findTraceFlag, getDebugLog } from '../apexUtils';
import { AgentBase } from './agentBase';
Expand Down Expand Up @@ -221,6 +220,15 @@ export class ProductionAgent extends AgentBase {
throw SfError.create({ name: 'noSessionId', message: 'Agent not started, please call .start() first' });
}

// Auto-resume session if historyBuffer is not initialized but sessionId is set
if (!this.historyBuffer) {
await this.resumeSession(this.sessionId);
// Double-check that resumeSession succeeded
if (!this.historyBuffer) {
throw SfError.create({ name: 'noHistoryBuffer', message: 'Failed to resume session' });
}
}

const url = `${this.apiBase}/sessions/${this.sessionId}/messages`;

const body = {
Expand All @@ -247,16 +255,14 @@ export class ProductionAgent extends AgentBase {
this.historyDir = await getHistoryDir(agentId, this.sessionId);
}

void appendTranscriptToHistory(
{
timestamp: new Date().toISOString(),
agentId,
sessionId: this.sessionId,
role: 'user',
text: message,
},
this.historyDir
);
const userEntry = {
timestamp: new Date().toISOString(),
agentId,
sessionId: this.sessionId,
role: 'user' as const,
text: message,
};
await logTurnToHistory(userEntry, ++this.turnCounter, this.historyDir, this.historyBuffer);

const response = await requestWithEndpointFallback<AgentPreviewSendResponse>(this.connection, {
method: 'POST',
Expand All @@ -270,24 +276,25 @@ export class ProductionAgent extends AgentBase {
const planId = response.messages.at(0)!.planId;
this.planIds.add(planId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: now that SessionHistoryBuffer stores the planId, it looks that planIds from AgentBase is not used anymore.


await appendTranscriptToHistory(
{
timestamp: new Date().toISOString(),
agentId,
sessionId: this.sessionId,
role: 'agent',
text: response.messages.at(0)?.message,
raw: response.messages,
},
this.historyDir
);
const agentEntry = {
timestamp: new Date().toISOString(),
agentId,
sessionId: this.sessionId,
role: 'agent' as const,
text: response.messages.at(0)?.message,
raw: response.messages,
};
const agentTurn = ++this.turnCounter;
await logTurnToHistory(agentEntry, agentTurn, this.historyDir, this.historyBuffer);

// Fetch and write trace immediately if available
if (planId) {
const trace = await this.getTrace(planId);
await writeTraceToHistory(planId, trace, this.historyDir);
await recordTraceForTurn(this.historyDir, agentTurn, planId, undefined, this.historyBuffer);
}

// Flush buffer to keep turn-index.json and metadata.json up to date
await this.historyBuffer.flush();

if (this.apexDebugging && this.canApexDebug()) {
const apexLog = await getDebugLog(this.connection, start, Date.now());
if (apexLog) {
Expand Down Expand Up @@ -367,26 +374,22 @@ export class ProductionAgent extends AgentBase {
this.historyDir = await getHistoryDir(agentId, response.sessionId);
const startTime = new Date().toISOString();

await appendTranscriptToHistory(
{
timestamp: startTime,
agentId,
sessionId: response.sessionId,
role: 'agent',
text: response.messages.map((m) => m.message).join('\n'),
raw: response.messages,
},
this.historyDir
);
// Initialize history buffer (no file I/O yet)
this.historyBuffer = new SessionHistoryBuffer(this.historyDir, response.sessionId, agentId, startTime);
this.turnCounter = 0;

// Write initial metadata
await writeMetaFileToHistory(this.historyDir, {
sessionId: response.sessionId,
const initialEntry = {
timestamp: startTime,
agentId,
startTime,
apexDebugging: this.apexDebugging,
planIds: [],
});
sessionId: response.sessionId,
role: 'agent' as const,
text: response.messages.map((m) => m.message).join('\n'),
raw: response.messages,
};
await logTurnToHistory(initialEntry, ++this.turnCounter, this.historyDir, this.historyBuffer);

// Write turn-index.json and metadata.json immediately so they exist after session start
await this.historyBuffer.flush();

const agentDir = await getAgentIndexDir(agentId);
await logSessionToIndex(agentDir, {
Expand Down Expand Up @@ -427,26 +430,27 @@ export class ProductionAgent extends AgentBase {
},
});

// Write end entry immediately
if (this.historyDir) {
await appendTranscriptToHistory(
{
timestamp: new Date().toISOString(),
agentId: this.id,
sessionId: this.sessionId,
role: 'agent',
reason,
raw: response.messages,
},
this.historyDir
);
// Update metadata with end time
await updateMetadataEndTime(this.historyDir, new Date().toISOString(), this.planIds);
// Write end entry and flush buffer
if (this.historyDir && this.historyBuffer) {
const endTime = new Date().toISOString();
const endEntry = {
timestamp: endTime,
agentId: this.id,
sessionId: this.sessionId,
role: 'agent' as const,
reason,
raw: response.messages,
};
await logTurnToHistory(endEntry, ++this.turnCounter, this.historyDir, this.historyBuffer);

// Flush all buffered data to disk (turn-index.json and metadata.json)
await this.historyBuffer.flush(endTime);
}

// Clear session data for next session
this.sessionId = undefined;
this.historyDir = undefined;
this.historyBuffer = undefined;
this.planIds = new Set<string>();

return response;
Expand Down
Loading
Loading