Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 202 additions & 13 deletions src/api/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,37 @@ pub(super) struct ChannelsResponse {
channels: Vec<ChannelResponse>,
}

#[derive(Deserialize, Default)]
pub(super) struct ListChannelsQuery {
#[serde(default)]
include_inactive: bool,
agent_id: Option<String>,
is_active: Option<bool>,
}

type AgentChannel = (String, crate::conversation::channels::ChannelInfo);

fn resolve_is_active_filter(query: &ListChannelsQuery) -> Option<bool> {
query.is_active.or(if query.include_inactive {
None
} else {
Some(true)
})
}

fn sort_channels_newest_first(channels: &mut [AgentChannel]) {
channels.sort_by(
|(left_agent_id, left_channel), (right_agent_id, right_channel)| {
right_channel
.last_activity_at
.cmp(&left_channel.last_activity_at)
.then_with(|| right_channel.created_at.cmp(&left_channel.created_at))
.then_with(|| left_agent_id.cmp(right_agent_id))
.then_with(|| left_channel.id.cmp(&right_channel.id))
},
);
}

#[derive(Serialize)]
pub(super) struct MessagesResponse {
items: Vec<crate::conversation::history::TimelineItem>,
Expand Down Expand Up @@ -57,25 +88,24 @@ pub(super) struct CancelProcessResponse {
message: String,
}

/// List active channels across all agents.
pub(super) async fn list_channels(State(state): State<Arc<ApiState>>) -> Json<ChannelsResponse> {
/// List channels across agents, with optional activity and agent filters.
pub(super) async fn list_channels(
State(state): State<Arc<ApiState>>,
Query(query): Query<ListChannelsQuery>,
) -> Json<ChannelsResponse> {
let pools = state.agent_pools.load();
let mut all_channels = Vec::new();
let mut collected_channels: Vec<AgentChannel> = Vec::new();
let is_active_filter = resolve_is_active_filter(&query);

for (agent_id, pool) in pools.iter() {
if query.agent_id.as_deref().is_some_and(|id| id != agent_id) {
continue;
}
let store = ChannelStore::new(pool.clone());
match store.list_active().await {
match store.list(is_active_filter).await {
Ok(channels) => {
for channel in channels {
all_channels.push(ChannelResponse {
agent_id: agent_id.clone(),
id: channel.id,
platform: channel.platform,
display_name: channel.display_name,
is_active: channel.is_active,
last_activity_at: channel.last_activity_at.to_rfc3339(),
created_at: channel.created_at.to_rfc3339(),
});
collected_channels.push((agent_id.clone(), channel));
}
}
Err(error) => {
Expand All @@ -84,6 +114,21 @@ pub(super) async fn list_channels(State(state): State<Arc<ApiState>>) -> Json<Ch
}
}

sort_channels_newest_first(&mut collected_channels);

let all_channels = collected_channels
.into_iter()
.map(|(agent_id, channel)| ChannelResponse {
agent_id,
id: channel.id,
platform: channel.platform,
display_name: channel.display_name,
is_active: channel.is_active,
last_activity_at: channel.last_activity_at.to_rfc3339(),
created_at: channel.created_at.to_rfc3339(),
})
.collect();

Json(ChannelsResponse {
channels: all_channels,
})
Expand Down Expand Up @@ -154,6 +199,13 @@ pub(super) struct DeleteChannelQuery {
channel_id: String,
}

#[derive(Deserialize)]
pub(super) struct SetChannelArchiveRequest {
agent_id: String,
channel_id: String,
archived: bool,
}

/// Delete a channel and its message history.
pub(super) async fn delete_channel(
State(state): State<Arc<ApiState>>,
Expand Down Expand Up @@ -181,6 +233,46 @@ pub(super) async fn delete_channel(
Ok(Json(serde_json::json!({ "success": true })))
}

/// Archive or unarchive a channel without deleting its history.
pub(super) async fn set_channel_archive(
State(state): State<Arc<ApiState>>,
Json(request): Json<SetChannelArchiveRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let pools = state.agent_pools.load();
let pool = pools.get(&request.agent_id).ok_or(StatusCode::NOT_FOUND)?;
let store = ChannelStore::new(pool.clone());

let is_active = !request.archived;
let updated = store
.set_active(&request.channel_id, is_active)
.await
.map_err(|error| {
tracing::error!(%error, "failed to update channel active state");
StatusCode::INTERNAL_SERVER_ERROR
})?;

if !updated {
return Err(StatusCode::NOT_FOUND);
}

tracing::info!(
agent_id = %request.agent_id,
channel_id = %request.channel_id,
archived = request.archived,
"channel archive state updated via API"
);

Ok(Json(archive_update_response_payload(request.archived)))
}

fn archive_update_response_payload(archived: bool) -> serde_json::Value {
serde_json::json!({
"success": true,
"archived": archived,
"is_active": !archived,
})
}

/// Cancel a running worker or branch via the API.
pub(super) async fn cancel_process(
State(state): State<Arc<ApiState>>,
Expand Down Expand Up @@ -223,3 +315,100 @@ pub(super) async fn cancel_process(
_ => Err(StatusCode::BAD_REQUEST),
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn resolve_is_active_filter_defaults_to_active_only() {
let query = ListChannelsQuery {
include_inactive: false,
agent_id: None,
is_active: None,
};

assert_eq!(resolve_is_active_filter(&query), Some(true));
}

#[test]
fn resolve_is_active_filter_allows_explicit_include_inactive() {
let query = ListChannelsQuery {
include_inactive: true,
agent_id: None,
is_active: None,
};

assert_eq!(resolve_is_active_filter(&query), None);
}

#[test]
fn resolve_is_active_filter_prefers_explicit_state_filter() {
let query = ListChannelsQuery {
include_inactive: true,
agent_id: None,
is_active: Some(false),
};

assert_eq!(resolve_is_active_filter(&query), Some(false));
}

#[test]
fn archive_update_response_payload_contains_archived_and_is_active() {
let payload = archive_update_response_payload(true);

assert_eq!(payload["success"], serde_json::Value::Bool(true));
assert_eq!(payload["archived"], serde_json::Value::Bool(true));
assert_eq!(payload["is_active"], serde_json::Value::Bool(false));
}

#[test]
fn sort_channels_newest_first_by_last_activity_then_created_at() {
fn make_channel(
id: &str,
last_activity_at: &str,
created_at: &str,
) -> crate::conversation::channels::ChannelInfo {
let last_activity_at = chrono::DateTime::parse_from_rfc3339(last_activity_at)
.expect("timestamp should parse")
.with_timezone(&chrono::Utc);
let created_at = chrono::DateTime::parse_from_rfc3339(created_at)
.expect("timestamp should parse")
.with_timezone(&chrono::Utc);

crate::conversation::channels::ChannelInfo {
id: id.to_string(),
platform: "portal".to_string(),
display_name: None,
platform_meta: None,
is_active: true,
created_at,
last_activity_at,
}
}

let mut channels = vec![
(
"agent-a".to_string(),
make_channel("a", "2026-03-02T10:00:00Z", "2026-03-02T08:00:00Z"),
),
(
"agent-b".to_string(),
make_channel("b", "2026-03-02T12:00:00Z", "2026-03-02T07:00:00Z"),
),
(
"agent-c".to_string(),
make_channel("c", "2026-03-02T10:00:00Z", "2026-03-02T09:00:00Z"),
),
];

sort_channels_newest_first(&mut channels);

let ids: Vec<_> = channels
.into_iter()
.map(|(agent_id, channel)| format!("{agent_id}:{}", channel.id))
.collect();

assert_eq!(ids, vec!["agent-b:b", "agent-c:c", "agent-a:a"]);
}
}
1 change: 1 addition & 0 deletions src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub async fn start_http_server(
"/channels",
get(channels::list_channels).delete(channels::delete_channel),
)
.route("/channels/archive", put(channels::set_channel_archive))
.route("/channels/messages", get(channels::channel_messages))
.route("/channels/status", get(channels::channel_status))
.route("/agents/workers", get(workers::list_workers))
Expand Down
Loading