Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/sprout-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,13 +604,15 @@ impl Db {
channel_id: Uuid,
limit: u32,
before_cursor: Option<DateTime<Utc>>,
since_cursor: Option<DateTime<Utc>>,
kind_filter: Option<&[u32]>,
) -> Result<Vec<thread::TopLevelMessage>> {
thread::get_channel_messages_top_level(
&self.pool,
channel_id,
limit,
before_cursor,
since_cursor,
kind_filter,
)
.await
Expand Down
28 changes: 24 additions & 4 deletions crates/sprout-db/src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,19 @@ pub async fn get_thread_summary(pool: &PgPool, event_id: &[u8]) -> Result<Option
/// - At depth 0 (root messages), OR
/// - At depth 1 with `broadcast = true` (replies surfaced to the channel)
///
/// Results are ordered newest-first for a standard channel view.
/// `before_cursor` enables keyset pagination (pass the `created_at` of the
/// last item from the previous page).
/// Default ordering is newest-first (DESC). When `since_cursor` is provided
/// without `before_cursor`, ordering flips to oldest-first (ASC) for
/// chronological polling.
///
/// `before_cursor` enables backward keyset pagination (pass the `created_at`
/// of the last item from the previous page). `since_cursor` enables forward
/// polling (returns only messages created after the given timestamp).
pub async fn get_channel_messages_top_level(
pool: &PgPool,
channel_id: Uuid,
limit: u32,
before_cursor: Option<DateTime<Utc>>,
since_cursor: Option<DateTime<Utc>>,
kind_filter: Option<&[u32]>,
) -> Result<Vec<TopLevelMessage>> {
let mut param_idx = 2u32; // $1 is channel_id
Expand Down Expand Up @@ -520,6 +525,11 @@ pub async fn get_channel_messages_top_level(
param_idx += 1;
}

if since_cursor.is_some() {
sql.push_str(&format!(" AND e.created_at > ${param_idx}"));
param_idx += 1;
}

if let Some(kinds) = kind_filter {
if !kinds.is_empty() {
let list = kinds
Expand All @@ -531,13 +541,23 @@ pub async fn get_channel_messages_top_level(
}
}

sql.push_str(&format!(" ORDER BY e.created_at DESC LIMIT ${param_idx}"));
let order = if since_cursor.is_some() && before_cursor.is_none() {
"ASC"
} else {
"DESC"
};
sql.push_str(&format!(
" ORDER BY e.created_at {order} LIMIT ${param_idx}"
));

let mut q = sqlx::query(&sql).bind(channel_id);

if let Some(cursor) = before_cursor {
q = q.bind(cursor);
}
if let Some(cursor) = since_cursor {
q = q.bind(cursor);
}
q = q.bind(limit as i32);

let rows = q.fetch_all(pool).await?;
Expand Down
14 changes: 12 additions & 2 deletions crates/sprout-mcp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ pub struct GetMessagesParams {
/// Unix timestamp cursor for pagination. Returns messages before this time.
#[serde(default)]
pub before: Option<i64>,
/// Unix timestamp cursor. Returns messages created strictly after this time.
/// When used without `before`, results are ordered oldest-first (chronological).
/// Useful for polling: pass the timestamp of the last seen message to get only newer ones.
#[serde(default)]
pub since: Option<i64>,
/// Comma-separated event kind numbers to filter by (e.g. "45001" for forum posts,
/// "45002" for votes). When omitted, all kinds are returned.
#[serde(default)]
Expand Down Expand Up @@ -1000,8 +1005,10 @@ Default kind is 9 (stream message)."
/// Get recent messages from a Sprout channel.
#[tool(
name = "get_messages",
description = "Fetch recent top-level messages from a Sprout channel. Use `before` for pagination \
(Unix timestamp). Use `kinds` to filter by event type (e.g. \"45001\" for forum posts, \
description = "Fetch recent top-level messages from a Sprout channel. Use `before` for backward \
pagination and `since` for forward pagination (both Unix timestamps). When `since` is \
provided without `before`, results are ordered oldest-first — useful for polling new \
messages. Use `kinds` to filter by event type (e.g. \"45001\" for forum posts, \
\"45002\" for votes). Thread summaries are included automatically. Threaded replies \
are not returned — use `get_thread` to fetch the full reply tree for a specific message."
)]
Expand All @@ -1024,6 +1031,9 @@ are not returned — use `get_thread` to fetch the full reply tree for a specifi
if let Some(before) = p.before {
query_parts.push(format!("before={before}"));
}
if let Some(since) = p.since {
query_parts.push(format!("since={since}"));
}
if let Some(ref kinds) = p.kinds {
query_parts.push(format!("kinds={}", percent_encode(kinds)));
}
Expand Down
23 changes: 20 additions & 3 deletions crates/sprout-relay/src/api/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! - `state.db.insert_thread_metadata(...)` → thread::insert_thread_metadata
//! - `state.db.get_thread_replies(root_id, depth_limit, limit, cursor)` → thread::get_thread_replies
//! - `state.db.get_thread_summary(event_id)` → thread::get_thread_summary
//! - `state.db.get_channel_messages_top_level(channel_id, limit, before, kind_filter)` → thread::get_channel_messages_top_level
//! - `state.db.get_channel_messages_top_level(channel_id, limit, before, since, kind_filter)` → thread::get_channel_messages_top_level
//! - `state.db.get_thread_metadata_by_event(event_id)` → thread::get_thread_metadata_by_event
//! - `state.db.get_event_by_id(id_bytes)` → event::get_event_by_id (already exists)
//! - `state.db.insert_event(event, channel_id)` → event::insert_event (already exists)
Expand Down Expand Up @@ -949,6 +949,10 @@ pub struct ListMessagesParams {
/// Pagination cursor — Unix timestamp (seconds). Returns messages created
/// strictly before this time.
pub before: Option<i64>,
/// Pagination cursor — Unix timestamp (seconds). Returns messages created
/// strictly after this time. Results are ordered oldest-first when `since`
/// is provided without `before`.
pub since: Option<i64>,
/// Legacy parameter (thread summaries are now always included). Kept for backward compatibility.
#[serde(default)]
pub with_threads: bool,
Expand All @@ -957,7 +961,10 @@ pub struct ListMessagesParams {
pub kinds: Option<String>,
}

/// List top-level messages in a channel (newest first).
/// List top-level messages in a channel.
///
/// Default ordering is newest-first (DESC). When `since` is provided without
/// `before`, ordering flips to oldest-first (ASC) for chronological polling.
///
/// Returns root messages and broadcast replies. Thread summaries (reply counts,
/// participant pubkeys) are always included. Thread replies themselves are excluded —
Expand Down Expand Up @@ -985,6 +992,10 @@ pub async fn list_messages(
.before
.and_then(|ts| chrono::DateTime::from_timestamp(ts, 0));

let since_cursor: Option<chrono::DateTime<Utc>> = params
.since
.and_then(|ts| chrono::DateTime::from_timestamp(ts, 0));

let kind_filter: Option<Vec<u32>> = params
.kinds
.as_deref()
Expand All @@ -1003,7 +1014,13 @@ pub async fn list_messages(

let mut messages = state
.db
.get_channel_messages_top_level(channel_id, limit, before_cursor, kind_filter.as_deref())
.get_channel_messages_top_level(
channel_id,
limit,
before_cursor,
since_cursor,
kind_filter.as_deref(),
)
.await
.map_err(|e| internal_error(&format!("db error: {e}")))?;

Expand Down
Loading