diff --git a/src/api/channels.rs b/src/api/channels.rs index a36418724..3d0c4d05b 100644 --- a/src/api/channels.rs +++ b/src/api/channels.rs @@ -26,6 +26,37 @@ pub(super) struct ChannelsResponse { channels: Vec, } +#[derive(Deserialize, Default)] +pub(super) struct ListChannelsQuery { + #[serde(default)] + include_inactive: bool, + agent_id: Option, + is_active: Option, +} + +type AgentChannel = (String, crate::conversation::channels::ChannelInfo); + +fn resolve_is_active_filter(query: &ListChannelsQuery) -> Option { + query.is_active.or(if query.include_inactive { + None + } else { + Some(true) + }) +} + +fn sort_channels_newest_first(channels: &mut [AgentChannel]) { + channels.sort_by( + |(left_agent_id, left_channel), (right_agent_id, right_channel)| { + right_channel + .last_activity_at + .cmp(&left_channel.last_activity_at) + .then_with(|| right_channel.created_at.cmp(&left_channel.created_at)) + .then_with(|| left_agent_id.cmp(right_agent_id)) + .then_with(|| left_channel.id.cmp(&right_channel.id)) + }, + ); +} + #[derive(Serialize)] pub(super) struct MessagesResponse { items: Vec, @@ -57,25 +88,24 @@ pub(super) struct CancelProcessResponse { message: String, } -/// List active channels across all agents. -pub(super) async fn list_channels(State(state): State>) -> Json { +/// List channels across agents, with optional activity and agent filters. +pub(super) async fn list_channels( + State(state): State>, + Query(query): Query, +) -> Json { let pools = state.agent_pools.load(); - let mut all_channels = Vec::new(); + let mut collected_channels: Vec = Vec::new(); + let is_active_filter = resolve_is_active_filter(&query); for (agent_id, pool) in pools.iter() { + if query.agent_id.as_deref().is_some_and(|id| id != agent_id) { + continue; + } let store = ChannelStore::new(pool.clone()); - match store.list_active().await { + match store.list(is_active_filter).await { Ok(channels) => { for channel in channels { - all_channels.push(ChannelResponse { - agent_id: agent_id.clone(), - id: channel.id, - platform: channel.platform, - display_name: channel.display_name, - is_active: channel.is_active, - last_activity_at: channel.last_activity_at.to_rfc3339(), - created_at: channel.created_at.to_rfc3339(), - }); + collected_channels.push((agent_id.clone(), channel)); } } Err(error) => { @@ -84,6 +114,21 @@ pub(super) async fn list_channels(State(state): State>) -> Json>, @@ -181,6 +233,46 @@ pub(super) async fn delete_channel( Ok(Json(serde_json::json!({ "success": true }))) } +/// Archive or unarchive a channel without deleting its history. +pub(super) async fn set_channel_archive( + State(state): State>, + Json(request): Json, +) -> Result, StatusCode> { + let pools = state.agent_pools.load(); + let pool = pools.get(&request.agent_id).ok_or(StatusCode::NOT_FOUND)?; + let store = ChannelStore::new(pool.clone()); + + let is_active = !request.archived; + let updated = store + .set_active(&request.channel_id, is_active) + .await + .map_err(|error| { + tracing::error!(%error, "failed to update channel active state"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + if !updated { + return Err(StatusCode::NOT_FOUND); + } + + tracing::info!( + agent_id = %request.agent_id, + channel_id = %request.channel_id, + archived = request.archived, + "channel archive state updated via API" + ); + + Ok(Json(archive_update_response_payload(request.archived))) +} + +fn archive_update_response_payload(archived: bool) -> serde_json::Value { + serde_json::json!({ + "success": true, + "archived": archived, + "is_active": !archived, + }) +} + /// Cancel a running worker or branch via the API. pub(super) async fn cancel_process( State(state): State>, @@ -263,3 +355,100 @@ pub(super) async fn cancel_process( _ => Err(StatusCode::BAD_REQUEST), } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn resolve_is_active_filter_defaults_to_active_only() { + let query = ListChannelsQuery { + include_inactive: false, + agent_id: None, + is_active: None, + }; + + assert_eq!(resolve_is_active_filter(&query), Some(true)); + } + + #[test] + fn resolve_is_active_filter_allows_explicit_include_inactive() { + let query = ListChannelsQuery { + include_inactive: true, + agent_id: None, + is_active: None, + }; + + assert_eq!(resolve_is_active_filter(&query), None); + } + + #[test] + fn resolve_is_active_filter_prefers_explicit_state_filter() { + let query = ListChannelsQuery { + include_inactive: true, + agent_id: None, + is_active: Some(false), + }; + + assert_eq!(resolve_is_active_filter(&query), Some(false)); + } + + #[test] + fn archive_update_response_payload_contains_archived_and_is_active() { + let payload = archive_update_response_payload(true); + + assert_eq!(payload["success"], serde_json::Value::Bool(true)); + assert_eq!(payload["archived"], serde_json::Value::Bool(true)); + assert_eq!(payload["is_active"], serde_json::Value::Bool(false)); + } + + #[test] + fn sort_channels_newest_first_by_last_activity_then_created_at() { + fn make_channel( + id: &str, + last_activity_at: &str, + created_at: &str, + ) -> crate::conversation::channels::ChannelInfo { + let last_activity_at = chrono::DateTime::parse_from_rfc3339(last_activity_at) + .expect("timestamp should parse") + .with_timezone(&chrono::Utc); + let created_at = chrono::DateTime::parse_from_rfc3339(created_at) + .expect("timestamp should parse") + .with_timezone(&chrono::Utc); + + crate::conversation::channels::ChannelInfo { + id: id.to_string(), + platform: "portal".to_string(), + display_name: None, + platform_meta: None, + is_active: true, + created_at, + last_activity_at, + } + } + + let mut channels = vec![ + ( + "agent-a".to_string(), + make_channel("a", "2026-03-02T10:00:00Z", "2026-03-02T08:00:00Z"), + ), + ( + "agent-b".to_string(), + make_channel("b", "2026-03-02T12:00:00Z", "2026-03-02T07:00:00Z"), + ), + ( + "agent-c".to_string(), + make_channel("c", "2026-03-02T10:00:00Z", "2026-03-02T09:00:00Z"), + ), + ]; + + sort_channels_newest_first(&mut channels); + + let ids: Vec<_> = channels + .into_iter() + .map(|(agent_id, channel)| format!("{agent_id}:{}", channel.id)) + .collect(); + + assert_eq!(ids, vec!["agent-b:b", "agent-c:c", "agent-a:a"]); + } +} diff --git a/src/api/server.rs b/src/api/server.rs index 24d2fc9e9..cfbda3c89 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -86,6 +86,7 @@ pub async fn start_http_server( "/channels", get(channels::list_channels).delete(channels::delete_channel), ) + .route("/channels/archive", put(channels::set_channel_archive)) .route("/channels/messages", get(channels::channel_messages)) .route("/channels/status", get(channels::channel_status)) .route("/agents/workers", get(workers::list_workers)) diff --git a/src/conversation/channels.rs b/src/conversation/channels.rs index d7536593e..716bd82a7 100644 --- a/src/conversation/channels.rs +++ b/src/conversation/channels.rs @@ -81,21 +81,55 @@ impl ChannelStore { }); } - /// List all active channels, most recently active first. - pub async fn list_active(&self) -> crate::error::Result> { - let rows = sqlx::query( - "SELECT id, platform, display_name, platform_meta, is_active, created_at, last_activity_at \ - FROM channels \ - WHERE is_active = 1 \ - ORDER BY last_activity_at DESC" - ) - .fetch_all(&self.pool) - .await - .map_err(|e| anyhow::anyhow!(e))?; + /// List channels, most recently active first. + /// + /// When `is_active_filter` is `Some(true)` or `Some(false)`, filtering is + /// pushed into SQL. `None` returns both active and inactive channels. + pub async fn list( + &self, + is_active_filter: Option, + ) -> crate::error::Result> { + let rows = match is_active_filter { + Some(true) => { + sqlx::query( + "SELECT id, platform, display_name, platform_meta, is_active, created_at, last_activity_at \ + FROM channels \ + WHERE is_active = 1 \ + ORDER BY last_activity_at DESC", + ) + .fetch_all(&self.pool) + .await + } + Some(false) => { + sqlx::query( + "SELECT id, platform, display_name, platform_meta, is_active, created_at, last_activity_at \ + FROM channels \ + WHERE is_active = 0 \ + ORDER BY last_activity_at DESC", + ) + .fetch_all(&self.pool) + .await + } + None => { + sqlx::query( + "SELECT id, platform, display_name, platform_meta, is_active, created_at, last_activity_at \ + FROM channels \ + ORDER BY last_activity_at DESC", + ) + .fetch_all(&self.pool) + .await + } + } + .map_err(|error| anyhow::anyhow!(error))?; Ok(rows.into_iter().map(row_to_channel_info).collect()) } + /// List all active channels, most recently active first. + pub async fn list_active(&self) -> crate::error::Result> { + self.list(Some(true)).await + } + /// Find a channel by partial name or ID match. /// /// Match priority: exact name > prefix > contains > channel ID contains. @@ -183,6 +217,18 @@ impl ChannelStore { Ok(result.rows_affected() > 0) } + + /// Set active/archive state for a channel. + pub async fn set_active(&self, channel_id: &str, active: bool) -> crate::error::Result { + let result = sqlx::query("UPDATE channels SET is_active = ? WHERE id = ?") + .bind(if active { 1_i64 } else { 0_i64 }) + .bind(channel_id) + .execute(&self.pool) + .await + .map_err(|e| anyhow::anyhow!(e))?; + + Ok(result.rows_affected() > 0) + } } fn row_to_channel_info(row: sqlx::sqlite::SqliteRow) -> ChannelInfo { @@ -299,3 +345,118 @@ fn extract_platform_meta( serde_json::to_string(&serde_json::Value::Object(meta)).ok() } } + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::sqlite::SqlitePoolOptions; + + async fn setup_store() -> ChannelStore { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("in-memory sqlite should connect"); + + sqlx::query( + r#" + CREATE TABLE channels ( + id TEXT PRIMARY KEY, + platform TEXT NOT NULL, + display_name TEXT, + platform_meta TEXT, + is_active INTEGER NOT NULL DEFAULT 1, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_activity_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + "#, + ) + .execute(&pool) + .await + .expect("channels table should create"); + + ChannelStore::new(pool) + } + + #[tokio::test] + async fn list_is_active_filter_controls_visibility() { + let store = setup_store().await; + + sqlx::query( + "INSERT INTO channels (id, platform, is_active, created_at, last_activity_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)", + ) + .bind("active-channel") + .bind("portal") + .bind(1_i64) + .execute(&store.pool) + .await + .expect("active channel should insert"); + + sqlx::query( + "INSERT INTO channels (id, platform, is_active, created_at, last_activity_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)", + ) + .bind("archived-channel") + .bind("portal") + .bind(0_i64) + .execute(&store.pool) + .await + .expect("archived channel should insert"); + + let active_only = store.list(Some(true)).await.expect("list should succeed"); + assert_eq!(active_only.len(), 1); + assert_eq!(active_only[0].id, "active-channel"); + + let archived_only = store.list(Some(false)).await.expect("list should succeed"); + assert_eq!(archived_only.len(), 1); + assert_eq!(archived_only[0].id, "archived-channel"); + + let all = store.list(None).await.expect("list should succeed"); + assert_eq!(all.len(), 2); + assert!(all.iter().any(|c| c.id == "active-channel" && c.is_active)); + assert!( + all.iter() + .any(|c| c.id == "archived-channel" && !c.is_active) + ); + } + + #[tokio::test] + async fn set_active_toggles_channel_state_without_deleting() { + let store = setup_store().await; + + sqlx::query( + "INSERT INTO channels (id, platform, is_active, created_at, last_activity_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)", + ) + .bind("chan-1") + .bind("portal") + .bind(1_i64) + .execute(&store.pool) + .await + .expect("channel should insert"); + + let archived = store + .set_active("chan-1", false) + .await + .expect("set_active should succeed"); + assert!(archived, "existing channel should be updated"); + + let channel = store + .get("chan-1") + .await + .expect("get should succeed") + .expect("channel should still exist"); + assert!(!channel.is_active); + + let unarchived = store + .set_active("chan-1", true) + .await + .expect("set_active should succeed"); + assert!(unarchived, "existing channel should be updated"); + + let channel = store + .get("chan-1") + .await + .expect("get should succeed") + .expect("channel should still exist"); + assert!(channel.is_active); + } +}