Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Relaycast is the messaging backbone:
- Workspace key (`rk_live_*`): admin token for managing workspace resources
- Agent token (`at_live_*`): token an individual agent uses to participate
- Identity types: `agent` (AI worker), `human` (person), `system` (automation/service actor)
- Message payloads and realtime message events include optional `agent_type` so clients can distinguish agent, human, and system senders without extra identity lookups.
- Channel: shared room for team/agent communication
- Message: post in channel/DM/thread, with optional files and reactions

Expand Down
12 changes: 12 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ components:
type: string
from_name:
type: string
agent_type:
type: string
enum: [agent, human, system]
description: Sender identity type for the message actor
text:
type: string
blocks:
Expand Down Expand Up @@ -183,6 +187,10 @@ components:
type: string
agent_name:
type: string
agent_type:
type: string
enum: [agent, human, system]
description: Sender identity type for the DM message actor
text:
type: string
injection_mode:
Expand Down Expand Up @@ -230,6 +238,10 @@ components:
type: string
agent_name:
type: string
agent_type:
type: string
enum: [agent, human, system]
description: Sender identity type for the message actor
text:
type: string
injection_mode:
Expand Down
6 changes: 4 additions & 2 deletions packages/server/src/durable-objects/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ export class AgentDO implements DurableObject {
// Channel messages (including thread replies) after `since`.
const channelRows = await db.all<Record<string, unknown>>(sql`
SELECT m.id, m.channel_id, m.agent_id, m.body, m.thread_id,
m.created_at, c.name AS channel_name, a.name AS agent_name
m.created_at, c.name AS channel_name, a.name AS agent_name, a.type AS agent_type
FROM messages m
JOIN channel_members cm ON cm.channel_id = m.channel_id AND cm.agent_id = ${agentId}
JOIN channels c ON c.id = m.channel_id
Expand All @@ -196,6 +196,7 @@ export class AgentDO implements DurableObject {
channel_name: row.channel_name,
agent_id: row.agent_id,
from_name: row.agent_name,
agent_type: row.agent_type,
text: row.body,
thread_id: row.thread_id,
created_at: new Date((row.created_at as number) * 1000).toISOString(),
Expand All @@ -211,7 +212,7 @@ export class AgentDO implements DurableObject {
// DM + group DM messages after `since`.
const dmRows = await db.all<Record<string, unknown>>(sql`
SELECT m.id, m.channel_id, m.agent_id, m.body, m.created_at,
a.name AS agent_name, dc.id AS conversation_id, dc.dm_type
a.name AS agent_name, a.type AS agent_type, dc.id AS conversation_id, dc.dm_type
FROM dm_conversations dc
JOIN dm_participants dp ON dp.conversation_id = dc.id AND dp.agent_id = ${agentId} AND dp.left_at IS NULL
JOIN messages m ON m.channel_id = dc.channel_id
Expand All @@ -230,6 +231,7 @@ export class AgentDO implements DurableObject {
agent_id: row.agent_id,
from_agent_id: row.agent_id,
from_name: row.agent_name,
agent_type: row.agent_type,
text: row.body,
created_at: new Date((row.created_at as number) * 1000).toISOString(),
} as Record<string, unknown>;
Expand Down
14 changes: 14 additions & 0 deletions packages/server/src/engine/__tests__/wsTransform.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('transformForClient', () => {
id: 'msg_1',
channel_name: 'general',
agent_id: 'agent_1',
agent_type: 'system',
from_name: 'Bot',
text: 'hi',
attachments: [{ file_id: 'f1', filename: 'a.txt', content_type: 'text/plain', size_bytes: 1 }],
Expand All @@ -29,6 +30,7 @@ describe('transformForClient', () => {
id: 'msg_1',
agent_id: 'agent_1',
agent_name: 'Bot',
agent_type: 'system',
text: 'hi',
attachments: [{ file_id: 'f1', filename: 'a.txt', content_type: 'text/plain', size_bytes: 1 }],
},
Expand All @@ -40,6 +42,7 @@ describe('transformForClient', () => {
id: 'msg_2',
channel_name: 'general',
agent_id: 'agent_1',
agent_type: 'human',
from_name: 'Bot',
text: 'edit',
}, 'ch_1');
Expand All @@ -51,6 +54,7 @@ describe('transformForClient', () => {
id: 'msg_2',
agent_id: 'agent_1',
agent_name: 'Bot',
agent_type: 'human',
text: 'edit',
},
});
Expand All @@ -61,6 +65,7 @@ describe('transformForClient', () => {
id: 'msg_3',
channel_name: 'general',
agent_id: 'agent_2',
agent_type: 'agent',
from_name: 'Alice',
text: 'reply',
thread_id: 'msg_root',
Expand All @@ -74,6 +79,7 @@ describe('transformForClient', () => {
id: 'msg_3',
agent_id: 'agent_2',
agent_name: 'Alice',
agent_type: 'agent',
text: 'reply',
},
});
Expand Down Expand Up @@ -114,6 +120,7 @@ describe('transformForClient', () => {
id: 'msg_6',
conversation_id: 'dm_1',
from_agent_id: 'agent_3',
agent_type: 'human',
from_name: 'Cara',
text: 'dm',
});
Expand All @@ -125,6 +132,7 @@ describe('transformForClient', () => {
id: 'msg_6',
agent_id: 'agent_3',
agent_name: 'Cara',
agent_type: 'human',
text: 'dm',
},
});
Expand All @@ -137,6 +145,7 @@ describe('transformForClient', () => {
id: 'msg_6b',
agent_id: 'agent_3',
agent_name: 'Cara',
agent_type: 'system',
text: 'dm nested',
injection_mode: 'steer',
attachments: [
Expand All @@ -152,6 +161,7 @@ describe('transformForClient', () => {
id: 'msg_6b',
agent_id: 'agent_3',
agent_name: 'Cara',
agent_type: 'system',
text: 'dm nested',
injection_mode: 'steer',
attachments: [
Expand All @@ -166,6 +176,7 @@ describe('transformForClient', () => {
id: 'msg_7',
conversation_id: 'gdm_1',
agent_id: 'agent_4',
agent_type: 'system',
from_name: 'Dan',
text: 'group',
});
Expand All @@ -177,6 +188,7 @@ describe('transformForClient', () => {
id: 'msg_7',
agent_id: 'agent_4',
agent_name: 'Dan',
agent_type: 'system',
text: 'group',
},
});
Expand All @@ -189,6 +201,7 @@ describe('transformForClient', () => {
id: 'msg_7b',
agent_id: 'agent_4',
agent_name: 'Dan',
agent_type: 'human',
text: 'group nested',
attachments: [
{ file_id: 'f2', filename: 'b.txt', content_type: 'text/plain', size_bytes: 2 },
Expand All @@ -203,6 +216,7 @@ describe('transformForClient', () => {
id: 'msg_7b',
agent_id: 'agent_4',
agent_name: 'Dan',
agent_type: 'human',
text: 'group nested',
attachments: [
{ file_id: 'f2', filename: 'b.txt', content_type: 'text/plain', size_bytes: 2 },
Expand Down
11 changes: 9 additions & 2 deletions packages/server/src/engine/dm.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import crypto from 'node:crypto';
import { eq, and, sql, lt, gt, isNull, inArray, asc } from 'drizzle-orm';
import type { DmMessage } from '@relaycast/types';
import type { AgentType, DmMessage } from '@relaycast/types';
import type { getDb } from '../db/index.js';
import {
messages,
Expand All @@ -18,6 +18,7 @@ import { logMessage } from './console.js';
type Db = ReturnType<typeof getDb>;

type AttachmentRow = { file_id: string; filename: string; content_type: string; size_bytes: number };
type SenderType = AgentType | undefined;

interface SendDmOptions {
skipA2aIntercept?: boolean;
Expand Down Expand Up @@ -233,7 +234,7 @@ export async function sendDm(
}

const [fromAgent] = await db
.select({ name: agents.name })
.select({ name: agents.name, type: agents.type })
.from(agents)
.where(and(eq(agents.workspaceId, workspaceId), eq(agents.id, fromAgentId)));

Expand Down Expand Up @@ -306,6 +307,7 @@ export async function sendDm(
id: message.id,
agent_id: message.agentId,
agent_name: fromAgent.name,
agent_type: fromAgent.type as SenderType,
text: message.body,
injection_mode: injectionMode,
attachments,
Expand Down Expand Up @@ -377,10 +379,12 @@ export async function listConversations(db: Db, workspaceId: string, agentId: st
id: messages.id,
channelId: messages.channelId,
agentId: messages.agentId,
agentType: agents.type,
body: messages.body,
createdAt: messages.createdAt,
})
.from(messages)
.leftJoin(agents, eq(messages.agentId, agents.id))
.where(inArray(messages.id, lastIds))
: [];

Expand Down Expand Up @@ -411,6 +415,7 @@ export async function listConversations(db: Db, workspaceId: string, agentId: st
id: lastMessage.id,
text: lastMessage.body,
agent_id: lastMessage.agentId,
agent_type: (lastMessage.agentType as SenderType) || undefined,
created_at: lastMessage.createdAt.toISOString(),
}
: null,
Expand Down Expand Up @@ -478,6 +483,7 @@ export async function getDmMessages(
id: messages.id,
agentId: messages.agentId,
agentName: agents.name,
agentType: agents.type,
body: messages.body,
metadata: messages.metadata,
createdAt: messages.createdAt,
Expand All @@ -494,6 +500,7 @@ export async function getDmMessages(
id: r.id,
agent_id: r.agentId,
agent_name: r.agentName,
agent_type: (r.agentType as SenderType) || undefined,
text: r.body,
injection_mode: (r.metadata as Record<string, unknown> | null)?.injection_mode as 'wait' | 'steer' | undefined,
attachments: attachmentMap.get(r.id) || [],
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/engine/groupDm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ export async function postGroupMessage(
}

const [fromAgent] = await db
.select({ name: agents.name })
.select({ name: agents.name, type: agents.type })
.from(agents)
.where(and(eq(agents.workspaceId, workspaceId), eq(agents.id, agentId)));

Expand Down Expand Up @@ -217,6 +217,7 @@ export async function postGroupMessage(
id: message.id,
agent_id: message.agentId,
agent_name: fromAgent.name,
agent_type: fromAgent.type,
text: message.body,
injection_mode: injectionMode,
attachments,
Expand Down
9 changes: 7 additions & 2 deletions packages/server/src/engine/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ export async function postMessage(
await db.insert(messageAttachments).values(attachmentValues);
}

// Fetch attachment details and agent name
// Fetch attachment details and sender identity
const [attachmentMap, [agent]] = await Promise.all([
hasAttachments ? fetchAttachmentsBatch(db, [messageId]) : Promise.resolve(new Map<string, AttachmentRow[]>()),
db.select({ name: agents.name }).from(agents).where(eq(agents.id, agentId)),
db.select({ name: agents.name, type: agents.type }).from(agents).where(eq(agents.id, agentId)),
]);
const attachments = attachmentMap.get(messageId) || [];

Expand All @@ -113,6 +113,7 @@ export async function postMessage(
channel_id: message.channelId,
agent_id: message.agentId,
agent_name: agent?.name || 'unknown',
agent_type: agent?.type || undefined,
text: message.body,
blocks: (message.blocks as unknown[] | null) || null,
metadata: (message.metadata as Record<string, unknown>) || {},
Expand Down Expand Up @@ -153,6 +154,7 @@ export async function getMessages(
channelId: messages.channelId,
agentId: messages.agentId,
agentName: agents.name,
agentType: agents.type,
threadId: messages.threadId,
body: messages.body,
blocks: messages.blocks,
Expand Down Expand Up @@ -234,6 +236,7 @@ export async function getMessages(
channel_id: row.channelId,
agent_id: row.agentId,
agent_name: row.agentName || 'unknown',
agent_type: row.agentType || undefined,
text: row.body,
blocks: (row.blocks as unknown[] | null) || null,
metadata: (row.metadata as Record<string, unknown>) || {},
Expand All @@ -255,6 +258,7 @@ export async function getMessage(db: Db, workspaceId: string, messageId: string)
channelId: messages.channelId,
agentId: messages.agentId,
agentName: agents.name,
agentType: agents.type,
threadId: messages.threadId,
body: messages.body,
blocks: messages.blocks,
Expand Down Expand Up @@ -295,6 +299,7 @@ export async function getMessage(db: Db, workspaceId: string, messageId: string)
channel_id: row.channelId,
agent_id: row.agentId,
agent_name: row.agentName || 'unknown',
agent_type: row.agentType || undefined,
text: row.body,
blocks: (row.blocks as unknown[] | null) || null,
metadata: (row.metadata as Record<string, unknown>) || {},
Expand Down
12 changes: 8 additions & 4 deletions packages/server/src/engine/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ export async function postReply(
})
.returning();

// Resolve agent name
const [agent] = await db.select({ name: agents.name }).from(agents).where(eq(agents.id, agentId));
// Resolve sender identity
const [agent] = await db.select({ name: agents.name, type: agents.type }).from(agents).where(eq(agents.id, agentId));

return {
id: reply.id,
channel_id: reply.channelId,
channel_name: ch?.name,
agent_id: reply.agentId,
agent_name: agent?.name || 'unknown',
agent_type: agent?.type || undefined,
thread_id: reply.threadId,
text: reply.body,
blocks: (reply.blocks as unknown[] | null) || null,
Expand All @@ -71,13 +72,14 @@ export async function getThread(
) {
const limit = Math.min(Math.max(opts.limit || 50, 1), 100);

// Get the parent message with agent name
// Get the parent message with sender identity
const [parent] = await db
.select({
id: messages.id,
channelId: messages.channelId,
agentId: messages.agentId,
agentName: agents.name,
agentType: agents.type,
threadId: messages.threadId,
body: messages.body,
blocks: messages.blocks,
Expand All @@ -101,7 +103,6 @@ export async function getThread(
.from(messages)
.where(eq(messages.threadId, parentId));

// Get replies with pagination
const conditions = [
eq(messages.threadId, parentId),
eq(messages.workspaceId, workspaceId),
Expand All @@ -120,6 +121,7 @@ export async function getThread(
channelId: messages.channelId,
agentId: messages.agentId,
agentName: agents.name,
agentType: agents.type,
threadId: messages.threadId,
body: messages.body,
blocks: messages.blocks,
Expand All @@ -139,6 +141,7 @@ export async function getThread(
channel_id: parent.channelId,
agent_id: parent.agentId,
agent_name: parent.agentName || 'unknown',
agent_type: parent.agentType || undefined,
text: parent.body,
blocks: (parent.blocks as unknown[] | null) || null,
metadata: (parent.metadata as Record<string, unknown>) || {},
Expand All @@ -152,6 +155,7 @@ export async function getThread(
channel_id: r.channelId,
agent_id: r.agentId,
agent_name: r.agentName || 'unknown',
agent_type: r.agentType || undefined,
thread_id: r.threadId,
text: r.body,
blocks: (r.blocks as unknown[] | null) || null,
Expand Down
Loading
Loading