diff --git a/.env.example b/.env.example index 85eaab8..6d7b11e 100644 --- a/.env.example +++ b/.env.example @@ -7,7 +7,7 @@ # All defaults here work with `docker compose up` out of the box. # # Service ports (defaults): -# MySQL → localhost:3306 +# Postgres → localhost:5432 # Redis → localhost:6379 # Typesense → localhost:8108 # Adminer → localhost:8082 (DB browser UI) @@ -16,13 +16,14 @@ # ============================================================================= # ----------------------------------------------------------------------------- -# Database (MySQL 8.0) +# Database (Postgres 17) # ----------------------------------------------------------------------------- -DATABASE_URL=mysql://sprout:sprout_dev@localhost:3306/sprout -MYSQL_ROOT_PASSWORD=sprout_dev -MYSQL_USER=sprout -MYSQL_PASSWORD=sprout_dev -MYSQL_DATABASE=sprout +DATABASE_URL=postgres://sprout:sprout_dev@localhost:5432/sprout +PGHOST=localhost +PGPORT=5432 +PGUSER=sprout +PGPASSWORD=sprout_dev +PGDATABASE=sprout # ----------------------------------------------------------------------------- # Redis 7 @@ -85,11 +86,6 @@ RUST_LOG=sprout_relay=debug,sprout_db=debug,sprout_auth=debug,sprout_pubsub=debu # OTLP tracing endpoint (optional — leave unset to disable) # OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 -# ----------------------------------------------------------------------------- -# sqlx (offline mode for Docker builds — set to true in CI/Docker) -# ----------------------------------------------------------------------------- -SQLX_OFFLINE=false - # ----------------------------------------------------------------------------- # Huddle (LiveKit integration) # ----------------------------------------------------------------------------- diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f436cf2..8d06378 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -130,15 +130,23 @@ jobs: docker logs "${container}" || true return 1 } - wait_healthy "MySQL" "sprout-mysql" + wait_healthy "Postgres" "sprout-postgres" wait_healthy "Redis" "sprout-redis" wait_healthy "Typesense" "sprout-typesense" + - name: Apply database schema + run: ./bin/pgschema apply --file schema/schema.sql --auto-approve + env: + PGHOST: localhost + PGPORT: "5432" + PGUSER: sprout + PGPASSWORD: sprout_dev + PGDATABASE: sprout - name: Build relay run: cargo build -p sprout-relay - name: Start relay run: | nohup env \ - DATABASE_URL=mysql://sprout:sprout_dev@localhost:3306/sprout \ + DATABASE_URL=postgres://sprout:sprout_dev@localhost:5432/sprout \ REDIS_URL=redis://localhost:6379 \ TYPESENSE_URL=http://localhost:8108 \ TYPESENSE_API_KEY=sprout_dev_key \ diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 4988d99..cb0f702 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -38,7 +38,7 @@ Sprout is a Rust monorepo (~22.7K LOC across 13 crates), licensed Apache 2.0 und └──────────┬──────────────┬──────────────────────────────────────────┘ │ │ ┌─────▼──────┐ ┌────▼──────┐ - │ MySQL │ │ Redis │ + │ Postgres │ │ Redis │ │ (events, │ │ (presence │ │ channels, │ │ SET EX, │ │ tokens, │ │ typing │ @@ -69,7 +69,7 @@ Sprout is a Rust monorepo (~22.7K LOC across 13 crates), licensed Apache 2.0 und ``` sprout-core (zero I/O — types, verification, filter matching, kind registry) │ - ├── sprout-db (MySQL: events, channels, tokens, workflows, audit) + ├── sprout-db (Postgres: events, channels, tokens, workflows, audit) ├── sprout-auth (NIP-42, Okta JWT, API tokens, scopes, rate limiting) ├── sprout-pubsub (Redis pub/sub, presence, typing indicators) ├── sprout-search (Typesense: index, query, delete) @@ -245,7 +245,7 @@ Presence events skip membership checks and use local-only fan-out. Multi-node pr 3. REDIS PUBLISH — pubsub.publish_event (no DB write) ``` -Ephemeral events are never stored in MySQL and never appear in REQ historical queries. +Ephemeral events are never stored in Postgres and never appear in REQ historical queries. ### Handler Semaphore @@ -304,7 +304,7 @@ This prevents a race where a non-member receives live fan-out events from a priv ### Historical Query (EOSE) -After registering, the REQ handler queries MySQL for stored events matching the filters (up to 500 per filter, hard cap). These are sent as `["EVENT", sub_id, event]` frames before `["EOSE", sub_id]`. New events arriving after EOSE are delivered via the fan-out path. +After registering, the REQ handler queries Postgres for stored events matching the filters (up to 500 per filter, hard cap). These are sent as `["EVENT", sub_id, event]` frames before `["EOSE", sub_id]`. New events arriving after EOSE are delivered via the fan-out path. --- @@ -377,7 +377,7 @@ pub trait RateLimiter: Send + Sync { ... } --- -### sprout-db — MySQL Event Store +### sprout-db — Postgres Event Store **3,698 LOC.** All database access. Uses `sqlx::query()` (runtime, not compile-time macros) — no `.sqlx/` offline cache required. @@ -407,7 +407,7 @@ pub trait RateLimiter: Send + Sync { ... } - Approval tokens: raw token never reaches the DB — caller hashes with SHA-256 before passing to `create_api_token`. - DDL injection protection in partition manager: allowlist of table names + strict suffix/date validators. -**Does NOT:** cache queries, implement connection pooling logic (delegated to sqlx), or make network calls outside MySQL. +**Does NOT:** cache queries, implement connection pooling logic (delegated to sqlx), or make network calls outside Postgres. --- @@ -457,7 +457,7 @@ EXPIRE sprout:typing:{channel_id} 60 - `delete_event()` is idempotent: 404 treated as success. - Permission filtering is **caller's responsibility** — `sprout-search` provides the `filter_by` mechanism but does not enforce access policy. -**Does NOT:** enforce channel membership or access control. Does NOT store events in MySQL. +**Does NOT:** enforce channel membership or access control. Does NOT store events in Postgres. --- @@ -732,7 +732,7 @@ Every security-sensitive operation uses an explicit, verified pattern. No implic | Token storage | SHA-256 hash only — raw token shown once at mint, never stored | | JWKS cache | Double-checked locking; HTTP fetch with no lock held (prevents global DoS) | | NIP-42 timestamp | ±60 second tolerance — prevents replay attacks | -| AUTH events | Never stored in MySQL, never logged in audit chain | +| AUTH events | Never stored in Postgres, never logged in audit chain | | Scopeless JWT | Defaults to `[MessagesRead]` only — least-privilege default | ### Input Validation @@ -766,7 +766,7 @@ Applied in: `sprout-workflow` (CallWebhook action), `sprout-core` (shared utilit - Channel membership is the only gate — enforced by the relay at every operation - REQ handler checks access before subscription registration — no race window for private channel leaks -- TOCTOU-safe membership operations: all check-then-modify sequences run inside MySQL transactions +- TOCTOU-safe membership operations: all check-then-modify sequences run inside Postgres transactions - Approval tokens: UUID (CSPRNG), stored as SHA-256 hash, single-use enforced with `AND status = 'pending'` in UPDATE ### Webhook Security @@ -785,13 +785,13 @@ Docker Compose provides the full local development stack. All services include h | Service | Image | Port | Purpose | |---------|-------|------|---------| -| MySQL | `mysql:8.0` | 3306 | Primary event store — events, channels, tokens, workflows, audit | +| Postgres | `postgres:17-alpine` | 5432 | Primary event store — events, channels, tokens, workflows, audit | | Redis | `redis:7-alpine` | 6379 | Pub/sub fan-out, presence (SET EX), typing (sorted sets) | | Typesense | `typesense/typesense:27.1` | 8108 | Full-text search index | -| Adminer | `adminer` | 8080 | MySQL web UI (dev only) | +| Adminer | `adminer` | 8080 | DB web UI (dev only) | | Keycloak | `quay.io/keycloak/keycloak:26` | 8443 | Local OAuth/OIDC stand-in for Okta | -### MySQL Schema (key tables) +### Postgres Schema (key tables) | Table | Purpose | |-------|---------| diff --git a/Cargo.toml b/Cargo.toml index b789e4e..b0f5e93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ tower-http = { version = "0.6", features = ["trace", "cors", "compression-gzip" # Database sqlx = { version = "0.8", features = [ - "runtime-tokio-rustls", "mysql", "uuid", "chrono", "json" + "runtime-tokio-rustls", "postgres", "uuid", "chrono", "json" ] } # Redis diff --git a/TESTING.md b/TESTING.md index b7441c3..cdcdd96 100644 --- a/TESTING.md +++ b/TESTING.md @@ -73,7 +73,7 @@ goose run --help | head -5 ```bash sqlx --version # If missing: -cargo install sqlx-cli --no-default-features --features mysql +# pgschema manages the schema now — sqlx-cli is no longer needed ``` ### screen @@ -113,7 +113,7 @@ lsof -ti :3000 | xargs kill -9 2>/dev/null # Check Docker services — if already running, skip `docker compose up` docker compose ps --format '{{.Name}} {{.Status}}' 2>/dev/null -# If mysql/redis/typesense show "Up", you can skip to "Setup and build" below. +# If postgres/redis/typesense show "Up", you can skip to "Setup and build" below. # If not running: docker compose up -d ``` @@ -136,9 +136,9 @@ docker compose up -d export $(cat .env | grep -v "^#" | grep -v "^$" | xargs) 2>/dev/null # Reset database (fresh state for tests) -docker exec sprout-mysql mysql -u root -psprout_dev -e \ +docker exec sprout-postgres psql -U sprout -d postgres -c \ "DROP DATABASE IF EXISTS sprout; CREATE DATABASE sprout;" 2>/dev/null -sqlx migrate run --database-url "$DATABASE_URL" +./bin/pgschema apply --file schema/schema.sql --auto-approve # Build the full workspace (relay, MCP server, ACP harness, test client, etc.) cargo build --release --workspace @@ -227,7 +227,7 @@ Run all commands from the sprout repo root. cd /path/to/sprout . bin/activate-hermit -# 1. Start Docker services (MySQL, Redis, Typesense, Keycloak) +# 1. Start Docker services (Postgres, Redis, Typesense, Keycloak) docker compose down -v && docker compose up -d docker compose ps # All services should show "Up" @@ -235,8 +235,8 @@ docker compose ps # All services should show "Up" [ -f .env ] || cp .env.example .env export $(cat .env | grep -v "^#" | grep -v "^$" | xargs) 2>/dev/null -# 3. Run database migrations -sqlx migrate run --database-url "$DATABASE_URL" +# 3. Apply database schema +./bin/pgschema apply --file schema/schema.sql --auto-approve # 4. Build all binaries (sprout-acp, sprout-mcp-server, mention, sprout-admin) cargo build --release --workspace @@ -1328,8 +1328,8 @@ To start fresh with no stale events, use a new keypair (mint a new token) for th docker compose ps # If any service is not "Up": docker compose down -v && docker compose up -d -# Wait 30s then re-run migrations: -sqlx migrate run --database-url "$DATABASE_URL" +# Wait 30s then re-apply schema: +./bin/pgschema apply --file schema/schema.sql --auto-approve ``` --- @@ -1380,7 +1380,7 @@ Manual testing guide for the Sprout Agent Channel Protection feature. Follow the ### 1.1 Sprout Relay - Running Sprout relay in dev mode with `require_auth_token=false` disabled (auth tokens required for all tests) -- MySQL database with the `agent_channel_protection` migration applied (see §2.2) +- Postgres database with schema applied via pgschema (see §2.2) - Default relay URL: `http://localhost:3001` — adjust if different ### 1.2 Tools Required @@ -1389,7 +1389,7 @@ Manual testing guide for the Sprout Agent Channel Protection feature. Follow the |------|---------|---------| | `curl` | REST API testing | Pre-installed on macOS/Linux | | `websocat` | NIP-29 WebSocket testing | `brew install websocat` or `cargo install websocat` | -| `mysql` / `mysql-client` | DB verification queries | `brew install mysql-client` | +| `psql` / `postgresql-client` | DB verification queries | `brew install postgresql` | | `sprout-admin` | Minting agent tokens | Built from `crates/sprout-admin/` | | `jq` | Pretty-print JSON responses | `brew install jq` | @@ -1476,20 +1476,19 @@ curl -s http://localhost:3001/health | jq . The `agent_channel_protection` migration adds `agent_owner_pubkey` and `channel_add_policy` to the `users` table. ```bash -# Using sqlx-cli -cargo install sqlx-cli --no-default-features --features mysql -sqlx migrate run --database-url "$DATABASE_URL" +# Apply schema via pgschema +./bin/pgschema apply --file schema/schema.sql --auto-approve # Or via justfile if configured just migrate ``` -Verify migration applied: +Verify schema applied: ```bash -mysql -u root -p sprout -e "DESCRIBE users;" | grep -E "agent_owner|channel_add" +docker exec sprout-postgres psql -U sprout -d sprout -c "\d users" | grep -E "agent_owner|channel_add" # Expected output: -# agent_owner_pubkey | varbinary(32) | YES | MUL | NULL | -# channel_add_policy | enum(...) | NO | | anyone | +# agent_owner_pubkey | bytea | | | +# channel_add_policy | channel_add_policy | | not null | 'anyone'::channel_add_policy ``` ### 2.3 Create a Test Channel @@ -2042,10 +2041,10 @@ curl -s http://localhost:3001/api/users/me \ Direct SQL queries to verify schema and data state. ```bash -# Connect to MySQL -mysql -u root -p sprout -# Or with DATABASE_URL -mysql "$DATABASE_URL" +# Connect to Postgres +docker exec -it sprout-postgres psql -U sprout -d sprout +# Or with the connection URL: +psql "$DATABASE_URL" ``` ### 6.1 Verify Migration Applied (AC-6 prerequisite) diff --git a/bin/.biome-2.4.7.pkg b/bin/.biome-2.4.7.pkg new file mode 120000 index 0000000..383f451 --- /dev/null +++ b/bin/.biome-2.4.7.pkg @@ -0,0 +1 @@ +hermit \ No newline at end of file diff --git a/bin/.pgschema-1.7.4.pkg b/bin/.pgschema-1.7.4.pkg new file mode 120000 index 0000000..383f451 --- /dev/null +++ b/bin/.pgschema-1.7.4.pkg @@ -0,0 +1 @@ +hermit \ No newline at end of file diff --git a/bin/biome b/bin/biome new file mode 120000 index 0000000..ff939ed --- /dev/null +++ b/bin/biome @@ -0,0 +1 @@ +.biome-2.4.7.pkg \ No newline at end of file diff --git a/bin/hermit.hcl b/bin/hermit.hcl index cc17d79..b429ec9 100644 --- a/bin/hermit.hcl +++ b/bin/hermit.hcl @@ -1,4 +1 @@ -manage-git = false - -github-token-auth { -} +manage-git = true diff --git a/bin/pgschema b/bin/pgschema new file mode 120000 index 0000000..f920f8a --- /dev/null +++ b/bin/pgschema @@ -0,0 +1 @@ +.pgschema-1.7.4.pkg \ No newline at end of file diff --git a/crates/sprout-admin/src/main.rs b/crates/sprout-admin/src/main.rs index e58d45d..8bc36dd 100644 --- a/crates/sprout-admin/src/main.rs +++ b/crates/sprout-admin/src/main.rs @@ -45,7 +45,7 @@ async fn main() -> Result<()> { let cli = Cli::parse(); let db_url = std::env::var("DATABASE_URL") - .unwrap_or_else(|_| "mysql://sprout:sprout_dev@localhost:3306/sprout".to_string()); + .unwrap_or_else(|_| "postgres://sprout:sprout_dev@localhost:5432/sprout".to_string()); let db = Db::new(&DbConfig { database_url: db_url, diff --git a/crates/sprout-audit/src/lib.rs b/crates/sprout-audit/src/lib.rs index 9d2272c..09f4295 100644 --- a/crates/sprout-audit/src/lib.rs +++ b/crates/sprout-audit/src/lib.rs @@ -1,7 +1,7 @@ #![deny(unsafe_code)] #![warn(missing_docs)] //! Tamper-evident hash-chain audit log. Each entry chains to the previous via -//! SHA-256. Single-writer via MySQL `GET_LOCK`. AUTH events (kind 22242) +//! SHA-256. Single-writer via Postgres `pg_advisory_lock`. AUTH events (kind 22242) //! are rejected — they carry bearer tokens. /// Audit action types recorded in the log. diff --git a/crates/sprout-audit/src/schema.rs b/crates/sprout-audit/src/schema.rs index a825c99..1bdfaa9 100644 --- a/crates/sprout-audit/src/schema.rs +++ b/crates/sprout-audit/src/schema.rs @@ -1,34 +1,18 @@ /// DDL for the `audit_log` table. Passed to [`sqlx::raw_sql`] on startup. -/// -/// Note: `CREATE TABLE IF NOT EXISTS` does not alter existing tables. If the -/// live database has `event_kind SMALLINT` from an earlier schema, run -/// [`AUDIT_MIGRATE_SQL`] once to widen the column to `INT`. pub const AUDIT_SCHEMA_SQL: &str = r#" CREATE TABLE IF NOT EXISTS audit_log ( seq BIGINT NOT NULL PRIMARY KEY, - timestamp DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(), event_id VARCHAR(255) NOT NULL, event_kind INT NOT NULL, actor_pubkey VARCHAR(255) NOT NULL, action VARCHAR(64) NOT NULL, - channel_id BINARY(16), - metadata JSON NOT NULL, + channel_id BYTEA, + metadata JSONB NOT NULL, prev_hash VARCHAR(64) NOT NULL, - hash VARCHAR(64) NOT NULL, - INDEX idx_audit_log_timestamp (timestamp), - INDEX idx_audit_log_actor (actor_pubkey), - INDEX idx_audit_log_channel (channel_id) + hash VARCHAR(64) NOT NULL ); -"#; - -/// One-time migration: widens `event_kind` from `SMALLINT` to `INT` on databases -/// created before the column type was corrected. Safe to run on an already-`INT` -/// column — MySQL is a no-op when the type matches. -/// -/// Run this manually: -/// ```sql -/// ALTER TABLE audit_log MODIFY COLUMN event_kind INT NOT NULL; -/// ``` -pub const AUDIT_MIGRATE_SQL: &str = r#" -ALTER TABLE audit_log MODIFY COLUMN event_kind INT NOT NULL; +CREATE INDEX IF NOT EXISTS idx_audit_log_timestamp ON audit_log (timestamp); +CREATE INDEX IF NOT EXISTS idx_audit_log_actor ON audit_log (actor_pubkey); +CREATE INDEX IF NOT EXISTS idx_audit_log_channel ON audit_log (channel_id); "#; diff --git a/crates/sprout-audit/src/service.rs b/crates/sprout-audit/src/service.rs index 15d8459..d2da43a 100644 --- a/crates/sprout-audit/src/service.rs +++ b/crates/sprout-audit/src/service.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use futures_util::FutureExt as _; -use sqlx::{Acquire, MySqlPool, Row}; +use sqlx::{Acquire, PgPool, Row}; use tracing::{debug, instrument, warn}; use sprout_core::kind::KIND_AUTH; @@ -13,20 +13,20 @@ use crate::{ schema::AUDIT_SCHEMA_SQL, }; -const AUDIT_LOCK_NAME: &str = "sprout_audit"; -const AUDIT_LOCK_TIMEOUT_SECS: i64 = 10; +/// Advisory lock key derived from a stable hash of "sprout_audit". +const AUDIT_LOCK_KEY: i64 = 0x5370_7275_7441_7564; // "SprutAud" as hex -/// Append-only audit log service backed by MySQL. +/// Append-only audit log service backed by Postgres. /// -/// Serialises writes via `GET_LOCK` so the hash chain remains consistent +/// Serialises writes via `pg_advisory_lock` so the hash chain remains consistent /// even when multiple relay processes share the same database. pub struct AuditService { - pool: MySqlPool, + pool: PgPool, } impl AuditService { /// Creates a new `AuditService` using the given connection pool. - pub fn new(pool: MySqlPool) -> Self { + pub fn new(pool: PgPool) -> Self { Self { pool } } @@ -36,13 +36,10 @@ impl AuditService { Ok(()) } - /// Append a new entry to the audit log. Single-writer via `GET_LOCK`. + /// Append a new entry to the audit log. Single-writer via `pg_advisory_lock`. /// - /// MySQL's GET_LOCK is session-scoped (not transaction-scoped), so we - /// acquire it before beginning the transaction and release it explicitly - /// after commit (or on any error path). `DO RELEASE_LOCK` is called in - /// all branches — success, error, and via `tokio::task::spawn` on panic — - /// so the lock is never left held on a pooled connection. + /// Postgres advisory locks are session-scoped, so we acquire before the + /// transaction and release after commit (or on any error path). #[instrument(skip(self, entry), fields(action = %entry.action))] pub async fn log(&self, entry: NewAuditEntry) -> Result { if entry.event_kind == KIND_AUTH { @@ -52,31 +49,21 @@ impl AuditService { let mut conn = self.pool.acquire().await?; - let lock_acquired: i32 = sqlx::query_scalar("SELECT GET_LOCK(?, ?)") - .bind(AUDIT_LOCK_NAME) - .bind(AUDIT_LOCK_TIMEOUT_SECS) - .fetch_one(&mut *conn) + // Acquire session-level advisory lock (blocks until available). + sqlx::query("SELECT pg_advisory_lock($1)") + .bind(AUDIT_LOCK_KEY) + .execute(&mut *conn) .await?; - if lock_acquired != 1 { - return Err(AuditError::Database(sqlx::Error::Protocol( - "failed to acquire advisory lock for audit log".into(), - ))); - } - // Run log_inner and release the lock regardless of outcome. // We use catch_unwind to handle panics so the lock is always released // before the connection is returned to the pool. - // - // ⚠️ SAFETY: log_inner is not UnwindSafe by default (it holds &mut conn), - // but we use AssertUnwindSafe because we own the connection and will not - // observe partial state after a panic — the connection is dropped. let result = std::panic::AssertUnwindSafe(self.log_inner(&mut conn, entry)) .catch_unwind() .await; - let _ = sqlx::query("DO RELEASE_LOCK(?)") - .bind(AUDIT_LOCK_NAME) + let _ = sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(AUDIT_LOCK_KEY) .execute(&mut *conn) .await; @@ -88,7 +75,7 @@ impl AuditService { async fn log_inner( &self, - conn: &mut sqlx::pool::PoolConnection, + conn: &mut sqlx::pool::PoolConnection, entry: NewAuditEntry, ) -> Result { let mut tx = conn.begin().await?; @@ -130,7 +117,7 @@ impl AuditService { INSERT INTO audit_log (seq, timestamp, event_id, event_kind, actor_pubkey, action, channel_id, metadata, prev_hash, hash) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) "#, ) .bind(audit_entry.seq) @@ -160,7 +147,7 @@ impl AuditService { SELECT seq, timestamp, event_id, event_kind, actor_pubkey, action, channel_id, metadata, prev_hash, hash FROM audit_log - WHERE seq BETWEEN ? AND ? + WHERE seq BETWEEN $1 AND $2 ORDER BY seq ASC "#, ) @@ -217,9 +204,9 @@ impl AuditService { SELECT seq, timestamp, event_id, event_kind, actor_pubkey, action, channel_id, metadata, prev_hash, hash FROM audit_log - WHERE seq >= ? + WHERE seq >= $1 ORDER BY seq ASC - LIMIT ? + LIMIT $2 "#, ) .bind(from_seq) @@ -231,7 +218,7 @@ impl AuditService { } } -fn row_to_audit_entry(row: &sqlx::mysql::MySqlRow) -> Result { +fn row_to_audit_entry(row: &sqlx::postgres::PgRow) -> Result { let seq: i64 = row.get("seq"); let action_str: String = row.get("action"); let action: AuditAction = action_str.parse().map_err(|_| { @@ -277,10 +264,10 @@ mod tests { DB_LOCK.get_or_init(|| Mutex::new(())) } - async fn test_pool() -> Option { + async fn test_pool() -> Option { let url = std::env::var("DATABASE_URL") - .unwrap_or_else(|_| "mysql://sprout:sprout_dev@localhost:3306/sprout".into()); - MySqlPool::connect(&url).await.ok() + .unwrap_or_else(|_| "postgres://sprout:sprout_dev@localhost:5432/sprout".into()); + PgPool::connect(&url).await.ok() } fn sample_new_entry(kind: u32, action: AuditAction) -> NewAuditEntry { @@ -294,7 +281,7 @@ mod tests { } } - async fn reset_audit_table(pool: &MySqlPool) { + async fn reset_audit_table(pool: &PgPool) { sqlx::query("TRUNCATE TABLE audit_log") .execute(pool) .await @@ -302,7 +289,7 @@ mod tests { } #[tokio::test] - #[ignore = "requires MySQL"] + #[ignore = "requires Postgres"] async fn genesis_entry() { let _guard = db_lock().lock().await; let Some(pool) = test_pool().await else { @@ -323,7 +310,7 @@ mod tests { } #[tokio::test] - #[ignore = "requires MySQL"] + #[ignore = "requires Postgres"] async fn chain_integrity() { let _guard = db_lock().lock().await; let Some(pool) = test_pool().await else { @@ -354,7 +341,7 @@ mod tests { } #[tokio::test] - #[ignore = "requires MySQL"] + #[ignore = "requires Postgres"] async fn verify_chain_detects_tampering() { let _guard = db_lock().lock().await; let Some(pool) = test_pool().await else { @@ -377,7 +364,7 @@ mod tests { .await .unwrap(); - sqlx::query("UPDATE audit_log SET actor_pubkey = 'tampered' WHERE seq = ?") + sqlx::query("UPDATE audit_log SET actor_pubkey = 'tampered' WHERE seq = $1") .bind(e2.seq) .execute(&pool) .await @@ -388,7 +375,7 @@ mod tests { } #[tokio::test] - #[ignore = "requires MySQL"] + #[ignore = "requires Postgres"] async fn auth_events_rejected() { let Some(pool) = test_pool().await else { return; diff --git a/crates/sprout-core/src/kind.rs b/crates/sprout-core/src/kind.rs index 87f145f..93705c8 100644 --- a/crates/sprout-core/src/kind.rs +++ b/crates/sprout-core/src/kind.rs @@ -308,7 +308,7 @@ pub fn event_kind_u32(event: &nostr::Event) -> u32 { event.kind.as_u16() as u32 } -/// Extract the kind from a nostr Event as i32 (for MySQL INT columns). +/// Extract the kind from a nostr Event as i32 (for Postgres INT columns). /// Safe: all Sprout kinds fit in i32 (max 65535 < i32::MAX). pub fn event_kind_i32(event: &nostr::Event) -> i32 { event.kind.as_u16() as i32 diff --git a/crates/sprout-db/Cargo.toml b/crates/sprout-db/Cargo.toml index d9656e0..313159e 100644 --- a/crates/sprout-db/Cargo.toml +++ b/crates/sprout-db/Cargo.toml @@ -5,7 +5,7 @@ edition.workspace = true rust-version.workspace = true license.workspace = true repository.workspace = true -description = "MySQL event store and data access layer for Sprout" +description = "Postgres event store and data access layer for Sprout" [dependencies] sprout-core = { workspace = true } diff --git a/crates/sprout-db/src/api_token.rs b/crates/sprout-db/src/api_token.rs index 51a4d6c..9d3f177 100644 --- a/crates/sprout-db/src/api_token.rs +++ b/crates/sprout-db/src/api_token.rs @@ -1,16 +1,15 @@ //! API token CRUD operations. use chrono::{DateTime, Utc}; -use sqlx::{MySqlPool, Row}; +use sqlx::{PgPool, Row}; use uuid::Uuid; use crate::error::{DbError, Result}; -use crate::event::uuid_from_bytes; /// Create a new API token record. The caller is responsible for generating /// the raw token and computing its SHA-256 hash. pub async fn create_api_token( - pool: &MySqlPool, + pool: &PgPool, token_hash: &[u8], owner_pubkey: &[u8], name: &str, @@ -19,7 +18,6 @@ pub async fn create_api_token( expires_at: Option>, ) -> Result { let id = Uuid::new_v4(); - let id_bytes = id.as_bytes().as_slice(); let scopes_json = serde_json::to_value(scopes).map_err(|e| DbError::InvalidData(e.to_string()))?; @@ -35,10 +33,10 @@ pub async fn create_api_token( sqlx::query( r#" INSERT INTO api_tokens (id, token_hash, owner_pubkey, name, scopes, channel_ids, expires_at) - VALUES (?, ?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5, $6, $7) "#, ) - .bind(id_bytes) + .bind(id) .bind(token_hash) .bind(owner_pubkey) .bind(name) @@ -53,12 +51,12 @@ pub async fn create_api_token( /// Atomic conditional INSERT: create a token only if the owner has fewer than 10 active tokens. /// -/// Uses a `SELECT ... WHERE COUNT < 10` subquery so the check and insert are atomic — +/// Uses a subquery so the check and insert are atomic -- /// no TOCTOU race between a separate count query and the insert. /// /// Returns `Ok(Some(uuid))` on success, `Ok(None)` if the 10-token limit is exceeded. pub async fn create_api_token_if_under_limit( - pool: &MySqlPool, + pool: &PgPool, token_hash: &[u8], owner_pubkey: &[u8], name: &str, @@ -67,7 +65,6 @@ pub async fn create_api_token_if_under_limit( expires_at: Option>, ) -> Result> { let id = Uuid::new_v4(); - let id_bytes = id.as_bytes().as_slice(); let scopes_json = serde_json::to_value(scopes).map_err(|e| DbError::InvalidData(e.to_string()))?; @@ -80,23 +77,22 @@ pub async fn create_api_token_if_under_limit( .transpose()?; // Conditional INSERT: only inserts if active (non-revoked, non-expired) token count < 10. - // The subquery and insert execute atomically — no separate count + insert race. + // The subquery and insert execute atomically -- no separate count + insert race. let result = sqlx::query( r#" INSERT INTO api_tokens (id, token_hash, owner_pubkey, name, scopes, channel_ids, expires_at, created_by_self_mint) - SELECT ?, ?, ?, ?, ?, ?, ?, TRUE - FROM DUAL + SELECT $1, $2, $3, $4, $5, $6, $7, TRUE WHERE ( SELECT COUNT(*) FROM api_tokens - WHERE owner_pubkey = ? + WHERE owner_pubkey = $8 AND revoked_at IS NULL AND (expires_at IS NULL OR expires_at > NOW()) ) < 10 "#, ) - .bind(id_bytes) + .bind(id) .bind(token_hash) .bind(owner_pubkey) .bind(name) @@ -108,7 +104,7 @@ pub async fn create_api_token_if_under_limit( .await?; if result.rows_affected() == 0 { - // Limit exceeded — the WHERE clause prevented the INSERT. + // Limit exceeded -- the WHERE clause prevented the INSERT. return Ok(None); } @@ -122,7 +118,7 @@ pub async fn create_api_token_if_under_limit( /// The relay layer uses this to return distinct `token_revoked` vs `invalid_token` /// error responses rather than treating both as "not found". pub async fn get_api_token_by_hash_including_revoked( - pool: &MySqlPool, + pool: &PgPool, hash: &[u8], ) -> Result> { let row = sqlx::query( @@ -130,7 +126,7 @@ pub async fn get_api_token_by_hash_including_revoked( SELECT id, token_hash, owner_pubkey, name, scopes, channel_ids, created_at, expires_at, last_used_at, revoked_at FROM api_tokens - WHERE token_hash = ? + WHERE token_hash = $1 "#, ) .bind(hash) @@ -142,8 +138,7 @@ pub async fn get_api_token_by_hash_including_revoked( Some(r) => r, }; - let id_bytes: Vec = row.try_get("id")?; - let id = uuid_from_bytes(&id_bytes)?; + let id: Uuid = row.try_get("id")?; let scopes_json: serde_json::Value = row.try_get("scopes")?; let scopes: Vec = serde_json::from_value(scopes_json) @@ -180,11 +175,11 @@ pub async fn get_api_token_by_hash_including_revoked( /// List all tokens (including revoked) for a pubkey, ordered by creation time descending. /// /// Returns the full [`crate::ApiTokenRecord`] including `token_hash`. Callers are -/// responsible for stripping `token_hash` before returning data to clients — the +/// responsible for stripping `token_hash` before returning data to clients -- the /// raw token value is never exposed after the initial mint response. /// Used by `GET /api/tokens` to show a user their full token history. pub async fn list_tokens_by_owner( - pool: &MySqlPool, + pool: &PgPool, pubkey: &[u8], ) -> Result> { let rows = sqlx::query( @@ -192,7 +187,7 @@ pub async fn list_tokens_by_owner( SELECT id, token_hash, owner_pubkey, name, scopes, channel_ids, created_at, expires_at, last_used_at, revoked_at FROM api_tokens - WHERE owner_pubkey = ? + WHERE owner_pubkey = $1 ORDER BY created_at DESC "#, ) @@ -202,8 +197,7 @@ pub async fn list_tokens_by_owner( let mut out = Vec::with_capacity(rows.len()); for row in rows { - let id_bytes: Vec = row.try_get("id")?; - let id = uuid_from_bytes(&id_bytes)?; + let id: Uuid = row.try_get("id")?; let scopes_json: serde_json::Value = row.try_get("scopes")?; let scopes: Vec = serde_json::from_value(scopes_json) @@ -247,23 +241,22 @@ pub async fn list_tokens_by_owner( /// Only revokes if the token is owned by `owner_pubkey` and not already revoked. /// Returns `true` if the token was revoked, `false` if not found, not owned, or already revoked. pub async fn revoke_token( - pool: &MySqlPool, + pool: &PgPool, id: Uuid, owner_pubkey: &[u8], revoked_by: &[u8], ) -> Result { - let id_bytes = id.as_bytes().as_slice(); let result = sqlx::query( r#" UPDATE api_tokens - SET revoked_at = NOW(6), revoked_by = ? - WHERE id = ? - AND owner_pubkey = ? + SET revoked_at = NOW(), revoked_by = $1 + WHERE id = $2 + AND owner_pubkey = $3 AND revoked_at IS NULL "#, ) .bind(revoked_by) - .bind(id_bytes) + .bind(id) .bind(owner_pubkey) .execute(pool) .await?; @@ -276,15 +269,15 @@ pub async fn revoke_token( /// Skips already-revoked tokens (idempotent). Returns the count of newly revoked tokens. /// If all tokens are already revoked, returns 0 with no error. pub async fn revoke_all_tokens( - pool: &MySqlPool, + pool: &PgPool, owner_pubkey: &[u8], revoked_by: &[u8], ) -> Result { let result = sqlx::query( r#" UPDATE api_tokens - SET revoked_at = NOW(6), revoked_by = ? - WHERE owner_pubkey = ? + SET revoked_at = NOW(), revoked_by = $1 + WHERE owner_pubkey = $2 AND revoked_at IS NULL "#, ) diff --git a/crates/sprout-db/src/channel.rs b/crates/sprout-db/src/channel.rs index ebe8b58..cfb4eda 100644 --- a/crates/sprout-db/src/channel.rs +++ b/crates/sprout-db/src/channel.rs @@ -5,11 +5,10 @@ //! - `private`: hidden, invite-only use chrono::{DateTime, Utc}; -use sqlx::{MySql, MySqlPool, Row, Transaction}; +use sqlx::{PgPool, Postgres, Row, Transaction}; use uuid::Uuid; use crate::error::{DbError, Result}; -use crate::event::uuid_from_bytes; /// Whether a channel is publicly visible or invite-only. #[derive(Debug, Clone, PartialEq, Eq)] @@ -199,7 +198,7 @@ pub struct MemberRecord { /// Creates a new channel, bootstraps the creator as owner, and returns the record. pub async fn create_channel( - pool: &MySqlPool, + pool: &PgPool, name: &str, channel_type: ChannelType, visibility: ChannelVisibility, @@ -214,17 +213,16 @@ pub async fn create_channel( } let id = Uuid::new_v4(); - let id_bytes = id.as_bytes().as_slice().to_vec(); let mut tx = pool.begin().await?; sqlx::query( r#" INSERT INTO channels (id, name, channel_type, visibility, description, created_by) - VALUES (?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3::channel_type, $4::channel_visibility, $5, $6) "#, ) - .bind(&id_bytes) + .bind(id) .bind(name) .bind(channel_type.as_str()) .bind(visibility.as_str()) @@ -236,14 +234,14 @@ pub async fn create_channel( sqlx::query( r#" INSERT INTO channel_members (channel_id, pubkey, role, invited_by) - VALUES (?, ?, 'owner', ?) - ON DUPLICATE KEY UPDATE + VALUES ($1, $2, 'owner', $3) + ON CONFLICT (channel_id, pubkey) DO UPDATE SET removed_at = NULL, removed_by = NULL, - role = VALUES(role) + role = EXCLUDED.role "#, ) - .bind(&id_bytes) + .bind(id) .bind(created_by) .bind(created_by) .execute(&mut *tx) @@ -251,15 +249,16 @@ pub async fn create_channel( let row = sqlx::query( r#" - SELECT id, name, channel_type, visibility, description, canvas, + SELECT id, name, channel_type::text AS channel_type, visibility::text AS visibility, + description, canvas, created_by, created_at, updated_at, archived_at, deleted_at, nip29_group_id, topic_required, max_members, topic, topic_set_by, topic_set_at, purpose, purpose_set_by, purpose_set_at - FROM channels WHERE id = ? + FROM channels WHERE id = $1 "#, ) - .bind(&id_bytes) + .bind(id) .fetch_one(&mut *tx) .await?; @@ -269,20 +268,19 @@ pub async fn create_channel( } /// Fetches a channel record by ID. Returns `ChannelNotFound` if missing or deleted. -pub async fn get_channel(pool: &MySqlPool, channel_id: Uuid) -> Result { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); - +pub async fn get_channel(pool: &PgPool, channel_id: Uuid) -> Result { let row = sqlx::query( r#" - SELECT id, name, channel_type, visibility, description, canvas, + SELECT id, name, channel_type::text AS channel_type, visibility::text AS visibility, + description, canvas, created_by, created_at, updated_at, archived_at, deleted_at, nip29_group_id, topic_required, max_members, topic, topic_set_by, topic_set_at, purpose, purpose_set_by, purpose_set_at - FROM channels WHERE id = ? AND deleted_at IS NULL + FROM channels WHERE id = $1 AND deleted_at IS NULL "#, ) - .bind(&id_bytes) + .bind(channel_id) .fetch_optional(pool) .await? .ok_or(DbError::ChannelNotFound(channel_id))?; @@ -291,10 +289,9 @@ pub async fn get_channel(pool: &MySqlPool, channel_id: Uuid) -> Result Result> { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); - let row = sqlx::query("SELECT canvas FROM channels WHERE id = ? AND deleted_at IS NULL") - .bind(&id_bytes) +pub async fn get_canvas(pool: &PgPool, channel_id: Uuid) -> Result> { + let row = sqlx::query("SELECT canvas FROM channels WHERE id = $1 AND deleted_at IS NULL") + .bind(channel_id) .fetch_optional(pool) .await? .ok_or(DbError::ChannelNotFound(channel_id))?; @@ -302,11 +299,10 @@ pub async fn get_canvas(pool: &MySqlPool, channel_id: Uuid) -> Result) -> Result<()> { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); - let rows = sqlx::query("UPDATE channels SET canvas = ? WHERE id = ? AND deleted_at IS NULL") +pub async fn set_canvas(pool: &PgPool, channel_id: Uuid, canvas: Option<&str>) -> Result<()> { + let rows = sqlx::query("UPDATE channels SET canvas = $1 WHERE id = $2 AND deleted_at IS NULL") .bind(canvas) - .bind(&id_bytes) + .bind(channel_id) .execute(pool) .await?; if rows.rows_affected() == 0 { @@ -327,7 +323,7 @@ pub async fn set_canvas(pool: &MySqlPool, channel_id: Uuid, canvas: Option<&str> /// The entire check-then-insert sequence runs inside a transaction to prevent TOCTOU /// races (e.g. the inviter being removed between the role check and the INSERT). pub async fn add_member( - pool: &MySqlPool, + pool: &PgPool, channel_id: Uuid, pubkey: &[u8], role: MemberRole, @@ -340,8 +336,6 @@ pub async fn add_member( ))); } - let channel_id_bytes = channel_id.as_bytes().as_slice().to_vec(); - let mut tx = pool.begin().await?; let channel = get_channel_tx(&mut tx, channel_id).await?; @@ -404,14 +398,14 @@ pub async fn add_member( sqlx::query( r#" INSERT INTO channel_members (channel_id, pubkey, role, invited_by) - VALUES (?, ?, ?, ?) - ON DUPLICATE KEY UPDATE + VALUES ($1, $2, $3::member_role, $4) + ON CONFLICT (channel_id, pubkey) DO UPDATE SET removed_at = NULL, removed_by = NULL, - role = VALUES(role) + role = EXCLUDED.role "#, ) - .bind(&channel_id_bytes) + .bind(channel_id) .bind(pubkey) .bind(effective_role.as_str()) .bind(invited_by) @@ -420,11 +414,11 @@ pub async fn add_member( let row = sqlx::query( r#" - SELECT channel_id, pubkey, role, joined_at, invited_by, removed_at - FROM channel_members WHERE channel_id = ? AND pubkey = ? + SELECT channel_id, pubkey, role::text AS role, joined_at, invited_by, removed_at + FROM channel_members WHERE channel_id = $1 AND pubkey = $2 "#, ) - .bind(&channel_id_bytes) + .bind(channel_id) .bind(pubkey) .fetch_one(&mut *tx) .await?; @@ -442,13 +436,11 @@ pub async fn add_member( /// The authorization check and the UPDATE run inside a transaction to prevent a /// TOCTOU race where the actor's role changes between the check and the update. pub async fn remove_member( - pool: &MySqlPool, + pool: &PgPool, channel_id: Uuid, pubkey: &[u8], actor_pubkey: &[u8], ) -> Result<()> { - let channel_id_bytes = channel_id.as_bytes().as_slice().to_vec(); - let mut tx = pool.begin().await?; let is_self_remove = pubkey == actor_pubkey; @@ -473,9 +465,9 @@ pub async fn remove_member( if target_role.as_deref() == Some("owner") { let row = sqlx::query( "SELECT COUNT(*) as cnt FROM channel_members \ - WHERE channel_id = ? AND role = 'owner' AND removed_at IS NULL", + WHERE channel_id = $1 AND role = 'owner' AND removed_at IS NULL", ) - .bind(&channel_id_bytes) + .bind(channel_id) .fetch_one(&mut *tx) .await?; let owner_count: i64 = row.try_get("cnt")?; @@ -489,12 +481,12 @@ pub async fn remove_member( let result = sqlx::query( r#" UPDATE channel_members - SET removed_at = NOW(), removed_by = ? - WHERE channel_id = ? AND pubkey = ? AND removed_at IS NULL + SET removed_at = NOW(), removed_by = $1 + WHERE channel_id = $2 AND pubkey = $3 AND removed_at IS NULL "#, ) .bind(actor_pubkey) - .bind(&channel_id_bytes) + .bind(channel_id) .bind(pubkey) .execute(&mut *tx) .await?; @@ -508,14 +500,13 @@ pub async fn remove_member( } /// Returns `true` if the given pubkey is an active member of the channel. -pub async fn is_member(pool: &MySqlPool, channel_id: Uuid, pubkey: &[u8]) -> Result { - let channel_id_bytes = channel_id.as_bytes().as_slice().to_vec(); +pub async fn is_member(pool: &PgPool, channel_id: Uuid, pubkey: &[u8]) -> Result { let row = sqlx::query( "SELECT COUNT(*) as cnt FROM channel_members cm \ JOIN channels c ON cm.channel_id = c.id AND c.deleted_at IS NULL \ - WHERE cm.channel_id = ? AND cm.pubkey = ? AND cm.removed_at IS NULL", + WHERE cm.channel_id = $1 AND cm.pubkey = $2 AND cm.removed_at IS NULL", ) - .bind(&channel_id_bytes) + .bind(channel_id) .bind(pubkey) .fetch_one(pool) .await?; @@ -526,19 +517,18 @@ pub async fn is_member(pool: &MySqlPool, channel_id: Uuid, pubkey: &[u8]) -> Res /// Returns all active members of the given channel. /// /// Returns an empty list if the channel has been soft-deleted. -pub async fn get_members(pool: &MySqlPool, channel_id: Uuid) -> Result> { - let channel_id_bytes = channel_id.as_bytes().as_slice().to_vec(); +pub async fn get_members(pool: &PgPool, channel_id: Uuid) -> Result> { let rows = sqlx::query( r#" - SELECT cm.channel_id, cm.pubkey, cm.role, cm.joined_at, cm.invited_by, cm.removed_at + SELECT cm.channel_id, cm.pubkey, cm.role::text AS role, cm.joined_at, cm.invited_by, cm.removed_at FROM channel_members cm JOIN channels c ON cm.channel_id = c.id AND c.deleted_at IS NULL - WHERE cm.channel_id = ? AND cm.removed_at IS NULL + WHERE cm.channel_id = $1 AND cm.removed_at IS NULL ORDER BY cm.joined_at ASC LIMIT 1000 "#, ) - .bind(&channel_id_bytes) + .bind(channel_id) .fetch_all(pool) .await?; rows.into_iter().map(row_to_member_record).collect() @@ -548,13 +538,13 @@ pub async fn get_members(pool: &MySqlPool, channel_id: Uuid) -> Result Result> { +pub async fn get_accessible_channel_ids(pool: &PgPool, pubkey: &[u8]) -> Result> { let rows = sqlx::query( r#" SELECT cm.channel_id FROM channel_members cm JOIN channels c ON cm.channel_id = c.id AND c.deleted_at IS NULL - WHERE cm.pubkey = ? AND cm.removed_at IS NULL + WHERE cm.pubkey = $1 AND cm.removed_at IS NULL UNION SELECT id AS channel_id FROM channels @@ -568,27 +558,25 @@ pub async fn get_accessible_channel_ids(pool: &MySqlPool, pubkey: &[u8]) -> Resu rows.into_iter() .map(|r| { - let bytes: Vec = r.try_get("channel_id")?; - uuid_from_bytes(&bytes) + let id: Uuid = r.try_get("channel_id")?; + Ok(id) }) .collect() } /// Lists channels, optionally filtered by visibility string. -pub async fn list_channels( - pool: &MySqlPool, - visibility: Option<&str>, -) -> Result> { +pub async fn list_channels(pool: &PgPool, visibility: Option<&str>) -> Result> { let rows = if let Some(vis) = visibility { sqlx::query( r#" - SELECT id, name, channel_type, visibility, description, canvas, + SELECT id, name, channel_type::text AS channel_type, visibility::text AS visibility, + description, canvas, created_by, created_at, updated_at, archived_at, deleted_at, nip29_group_id, topic_required, max_members, topic, topic_set_by, topic_set_at, purpose, purpose_set_by, purpose_set_at FROM channels - WHERE deleted_at IS NULL AND visibility = ? + WHERE deleted_at IS NULL AND visibility::text = $1 ORDER BY created_at DESC LIMIT 1000 "#, @@ -599,7 +587,8 @@ pub async fn list_channels( } else { sqlx::query( r#" - SELECT id, name, channel_type, visibility, description, canvas, + SELECT id, name, channel_type::text AS channel_type, visibility::text AS visibility, + description, canvas, created_by, created_at, updated_at, archived_at, deleted_at, nip29_group_id, topic_required, max_members, topic, topic_set_by, topic_set_at, @@ -619,16 +608,15 @@ pub async fn list_channels( /// Transaction-aware variant of [`get_active_role_tx`]. async fn get_active_role_tx( - tx: &mut Transaction<'_, MySql>, + tx: &mut Transaction<'_, Postgres>, channel_id: Uuid, pubkey: &[u8], ) -> Result> { - let channel_id_bytes = channel_id.as_bytes().as_slice().to_vec(); let row = sqlx::query( - "SELECT role FROM channel_members \ - WHERE channel_id = ? AND pubkey = ? AND removed_at IS NULL", + "SELECT role::text AS role FROM channel_members \ + WHERE channel_id = $1 AND pubkey = $2 AND removed_at IS NULL", ) - .bind(&channel_id_bytes) + .bind(channel_id) .bind(pubkey) .fetch_optional(&mut **tx) .await?; @@ -637,21 +625,21 @@ async fn get_active_role_tx( /// Transaction-aware variant of [`get_channel`]. async fn get_channel_tx( - tx: &mut Transaction<'_, MySql>, + tx: &mut Transaction<'_, Postgres>, channel_id: Uuid, ) -> Result { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); let row = sqlx::query( r#" - SELECT id, name, channel_type, visibility, description, canvas, + SELECT id, name, channel_type::text AS channel_type, visibility::text AS visibility, + description, canvas, created_by, created_at, updated_at, archived_at, deleted_at, nip29_group_id, topic_required, max_members, topic, topic_set_by, topic_set_at, purpose, purpose_set_by, purpose_set_at - FROM channels WHERE id = ? AND deleted_at IS NULL + FROM channels WHERE id = $1 AND deleted_at IS NULL "#, ) - .bind(&id_bytes) + .bind(channel_id) .fetch_optional(&mut **tx) .await? .ok_or(DbError::ChannelNotFound(channel_id))?; @@ -669,7 +657,7 @@ pub struct BotMemberRecord { pub agent_type: Option, /// Optional JSON capabilities descriptor. pub capabilities: Option, - /// Comma-separated channel names (from GROUP_CONCAT). + /// Comma-separated channel names (from string_agg). pub channel_names: String, } @@ -698,13 +686,13 @@ pub struct AccessibleChannel { /// Returns full channel records for all channels a user can access: /// open channels (visible to everyone) plus channels where the user is an active member. /// -/// Uses DISTINCT + LEFT JOIN so a user who is a member of an open channel does not -/// see it twice. Results are ordered stream → forum → dm, then alphabetically by name. +/// Uses a LEFT JOIN on channel_members (PK: channel_id + pubkey) which produces at +/// most one row per channel. Results are ordered stream -> forum -> dm, then by name. /// /// If `visibility_filter` is `Some("open")` or `Some("private")`, only channels with /// that visibility value are returned. `None` returns all accessible channels. pub async fn get_accessible_channels( - pool: &MySqlPool, + pool: &PgPool, pubkey: &[u8], visibility_filter: Option<&str>, member_only: Option, @@ -721,7 +709,8 @@ pub async fn get_accessible_channels( let base = format!( r#" - SELECT DISTINCT c.id, c.name, c.channel_type, c.visibility, c.description, c.canvas, + SELECT c.id, c.name, c.channel_type::text AS channel_type, + c.visibility::text AS visibility, c.description, c.canvas, c.created_by, c.created_at, c.updated_at, c.archived_at, c.deleted_at, c.nip29_group_id, c.topic_required, c.max_members, c.topic, c.topic_set_by, c.topic_set_at, @@ -729,16 +718,16 @@ pub async fn get_accessible_channels( (cm.channel_id IS NOT NULL) AS is_member FROM channels c LEFT JOIN channel_members cm - ON c.id = cm.channel_id AND cm.pubkey = ? AND cm.removed_at IS NULL + ON c.id = cm.channel_id AND cm.pubkey = $1 AND cm.removed_at IS NULL WHERE c.deleted_at IS NULL {membership_clause} "# ); let sql = if visibility_filter.is_some() { - format!("{base} AND c.visibility = ?\n ORDER BY FIELD(c.channel_type, 'stream', 'forum', 'dm'), c.name\n LIMIT 1000") + format!("{base} AND c.visibility::text = $2\n ORDER BY array_position(ARRAY['stream','forum','dm']::text[], c.channel_type::text), c.name\n LIMIT 1000") } else { - format!("{base} ORDER BY FIELD(c.channel_type, 'stream', 'forum', 'dm'), c.name\n LIMIT 1000") + format!("{base} ORDER BY array_position(ARRAY['stream','forum','dm']::text[], c.channel_type::text), c.name\n LIMIT 1000") }; let query = sqlx::query(&sql).bind(pubkey); @@ -751,7 +740,7 @@ pub async fn get_accessible_channels( let rows = query.fetch_all(pool).await?; rows.into_iter() .map(|row| { - let is_member: bool = row.try_get::("is_member").unwrap_or(0) != 0; + let is_member: bool = row.try_get("is_member").unwrap_or(false); let channel = row_to_channel_record(row)?; Ok(AccessibleChannel { channel, is_member }) }) @@ -760,13 +749,13 @@ pub async fn get_accessible_channels( /// Returns all bot-role members with their aggregated channel names. /// -/// Channel names are returned as a comma-separated string from GROUP_CONCAT. +/// Channel names are returned as a comma-separated string from string_agg. /// Members with no active channel memberships are excluded (INNER JOIN on channels). -pub async fn get_bot_members(pool: &MySqlPool) -> Result> { +pub async fn get_bot_members(pool: &PgPool) -> Result> { let rows = sqlx::query( r#" SELECT cm.pubkey, u.display_name, u.agent_type, u.capabilities, - GROUP_CONCAT(DISTINCT c.name ORDER BY c.name SEPARATOR ',') AS channel_names + string_agg(DISTINCT c.name, ',' ORDER BY c.name) AS channel_names FROM channel_members cm LEFT JOIN users u ON cm.pubkey = u.pubkey JOIN channels c ON cm.channel_id = c.id AND c.deleted_at IS NULL @@ -799,18 +788,18 @@ pub async fn get_bot_members(pool: &MySqlPool) -> Result> { /// Returns only users that exist in the `users` table. Ordering matches input order /// is NOT guaranteed — callers should index by pubkey if order matters. /// Returns an empty vec immediately when `pubkeys` is empty (no query issued). -pub async fn get_users_bulk(pool: &MySqlPool, pubkeys: &[Vec]) -> Result> { +pub async fn get_users_bulk(pool: &PgPool, pubkeys: &[Vec]) -> Result> { if pubkeys.is_empty() { return Ok(Vec::new()); } - // Build a parameterised IN clause: (?, ?, ...) - // Safety: placeholders are "?" markers only — all values are bound via - // `.bind()` below. No user input is interpolated into the SQL string. - let placeholders = pubkeys.iter().map(|_| "?").collect::>().join(", "); - let sql = format!( - "SELECT pubkey, display_name, avatar_url, nip05_handle FROM users WHERE pubkey IN ({placeholders})" - ); + // Build a parameterised IN clause: ($1, $2, ...) + let placeholders = (1..=pubkeys.len()) + .map(|i| format!("${i}")) + .collect::>() + .join(", "); + let sql = + format!("SELECT pubkey, display_name, avatar_url, nip05_handle FROM users WHERE pubkey IN ({placeholders})"); let mut q = sqlx::query(&sql); for pk in pubkeys { @@ -831,9 +820,8 @@ pub async fn get_users_bulk(pool: &MySqlPool, pubkeys: &[Vec]) -> Result Result { - let id_bytes: Vec = row.try_get("id")?; - let id = uuid_from_bytes(&id_bytes)?; +fn row_to_channel_record(row: sqlx::postgres::PgRow) -> Result { + let id: Uuid = row.try_get("id")?; let topic_required: bool = row.try_get("topic_required")?; // topic/purpose fields are new — use try_get and fall back to None if the @@ -869,9 +857,8 @@ fn row_to_channel_record(row: sqlx::mysql::MySqlRow) -> Result { }) } -fn row_to_member_record(row: sqlx::mysql::MySqlRow) -> Result { - let channel_id_bytes: Vec = row.try_get("channel_id")?; - let channel_id = uuid_from_bytes(&channel_id_bytes)?; +fn row_to_member_record(row: sqlx::postgres::PgRow) -> Result { + let channel_id: Uuid = row.try_get("channel_id")?; Ok(MemberRecord { channel_id, @@ -898,7 +885,7 @@ pub struct ChannelUpdate { /// At least one field must be `Some`; returns `InvalidData` otherwise. /// Returns the updated `ChannelRecord` on success. pub async fn update_channel( - pool: &MySqlPool, + pool: &PgPool, channel_id: Uuid, updates: ChannelUpdate, ) -> Result { @@ -908,18 +895,20 @@ pub async fn update_channel( )); } - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); - // Build SET clause dynamically — only include fields that are Some. - let mut set_parts: Vec<&str> = Vec::new(); + // Track parameter index for positional placeholders. + let mut set_parts: Vec = Vec::new(); + let mut param_idx: usize = 1; if updates.name.is_some() { - set_parts.push("name = ?"); + set_parts.push(format!("name = ${param_idx}")); + param_idx += 1; } if updates.description.is_some() { - set_parts.push("description = ?"); + set_parts.push(format!("description = ${param_idx}")); + param_idx += 1; } let sql = format!( - "UPDATE channels SET {}, updated_at = NOW(6) WHERE id = ? AND deleted_at IS NULL", + "UPDATE channels SET {}, updated_at = NOW() WHERE id = ${param_idx} AND deleted_at IS NULL", set_parts.join(", ") ); @@ -930,7 +919,7 @@ pub async fn update_channel( if let Some(ref desc) = updates.description { q = q.bind(desc); } - q = q.bind(&id_bytes); + q = q.bind(channel_id); let result = q.execute(pool).await?; if result.rows_affected() == 0 { @@ -941,20 +930,14 @@ pub async fn update_channel( } /// Sets the topic for a channel, recording who set it and when. -pub async fn set_topic( - pool: &MySqlPool, - channel_id: Uuid, - topic: &str, - set_by: &[u8], -) -> Result<()> { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); +pub async fn set_topic(pool: &PgPool, channel_id: Uuid, topic: &str, set_by: &[u8]) -> Result<()> { let result = sqlx::query( - "UPDATE channels SET topic = ?, topic_set_by = ?, topic_set_at = NOW(6) \ - WHERE id = ? AND deleted_at IS NULL", + "UPDATE channels SET topic = $1, topic_set_by = $2, topic_set_at = NOW() \ + WHERE id = $3 AND deleted_at IS NULL", ) .bind(topic) .bind(set_by) - .bind(&id_bytes) + .bind(channel_id) .execute(pool) .await?; if result.rows_affected() == 0 { @@ -965,19 +948,18 @@ pub async fn set_topic( /// Sets the purpose for a channel, recording who set it and when. pub async fn set_purpose( - pool: &MySqlPool, + pool: &PgPool, channel_id: Uuid, purpose: &str, set_by: &[u8], ) -> Result<()> { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); let result = sqlx::query( - "UPDATE channels SET purpose = ?, purpose_set_by = ?, purpose_set_at = NOW(6) \ - WHERE id = ? AND deleted_at IS NULL", + "UPDATE channels SET purpose = $1, purpose_set_by = $2, purpose_set_at = NOW() \ + WHERE id = $3 AND deleted_at IS NULL", ) .bind(purpose) .bind(set_by) - .bind(&id_bytes) + .bind(channel_id) .execute(pool) .await?; if result.rows_affected() == 0 { @@ -990,12 +972,10 @@ pub async fn set_purpose( /// /// Returns `AccessDenied` if the channel is already archived. /// Returns `ChannelNotFound` if the channel does not exist or is deleted. -pub async fn archive_channel(pool: &MySqlPool, channel_id: Uuid) -> Result<()> { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); - +pub async fn archive_channel(pool: &PgPool, channel_id: Uuid) -> Result<()> { // First check: does the channel exist and what is its state? - let row = sqlx::query("SELECT archived_at FROM channels WHERE id = ? AND deleted_at IS NULL") - .bind(&id_bytes) + let row = sqlx::query("SELECT archived_at FROM channels WHERE id = $1 AND deleted_at IS NULL") + .bind(channel_id) .fetch_optional(pool) .await?; @@ -1012,10 +992,10 @@ pub async fn archive_channel(pool: &MySqlPool, channel_id: Uuid) -> Result<()> { } sqlx::query( - "UPDATE channels SET archived_at = NOW(6) \ - WHERE id = ? AND deleted_at IS NULL AND archived_at IS NULL", + "UPDATE channels SET archived_at = NOW() \ + WHERE id = $1 AND deleted_at IS NULL AND archived_at IS NULL", ) - .bind(&id_bytes) + .bind(channel_id) .execute(pool) .await?; @@ -1026,12 +1006,10 @@ pub async fn archive_channel(pool: &MySqlPool, channel_id: Uuid) -> Result<()> { /// /// Returns `AccessDenied` if the channel is not currently archived. /// Returns `ChannelNotFound` if the channel does not exist or is deleted. -pub async fn unarchive_channel(pool: &MySqlPool, channel_id: Uuid) -> Result<()> { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); - +pub async fn unarchive_channel(pool: &PgPool, channel_id: Uuid) -> Result<()> { // First check: does the channel exist and what is its state? - let row = sqlx::query("SELECT archived_at FROM channels WHERE id = ? AND deleted_at IS NULL") - .bind(&id_bytes) + let row = sqlx::query("SELECT archived_at FROM channels WHERE id = $1 AND deleted_at IS NULL") + .bind(channel_id) .fetch_optional(pool) .await?; @@ -1047,24 +1025,23 @@ pub async fn unarchive_channel(pool: &MySqlPool, channel_id: Uuid) -> Result<()> sqlx::query( "UPDATE channels SET archived_at = NULL \ - WHERE id = ? AND deleted_at IS NULL AND archived_at IS NOT NULL", + WHERE id = $1 AND deleted_at IS NULL AND archived_at IS NOT NULL", ) - .bind(&id_bytes) + .bind(channel_id) .execute(pool) .await?; Ok(()) } -/// Soft-delete a channel by setting `deleted_at = NOW(6)`. +/// Soft-delete a channel by setting `deleted_at = NOW()`. /// /// Returns `Ok(true)` if the channel was deleted, `Ok(false)` if already /// deleted or not found. -pub async fn soft_delete_channel(pool: &MySqlPool, channel_id: Uuid) -> Result { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); +pub async fn soft_delete_channel(pool: &PgPool, channel_id: Uuid) -> Result { let result = - sqlx::query("UPDATE channels SET deleted_at = NOW(6) WHERE id = ? AND deleted_at IS NULL") - .bind(&id_bytes) + sqlx::query("UPDATE channels SET deleted_at = NOW() WHERE id = $1 AND deleted_at IS NULL") + .bind(channel_id) .execute(pool) .await?; @@ -1072,12 +1049,11 @@ pub async fn soft_delete_channel(pool: &MySqlPool, channel_id: Uuid) -> Result Result { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); +pub async fn get_member_count(pool: &PgPool, channel_id: Uuid) -> Result { let row = sqlx::query( - "SELECT COUNT(*) as cnt FROM channel_members WHERE channel_id = ? AND removed_at IS NULL", + "SELECT COUNT(*) as cnt FROM channel_members WHERE channel_id = $1 AND removed_at IS NULL", ) - .bind(&id_bytes) + .bind(channel_id) .fetch_one(pool) .await?; Ok(row.try_get("cnt")?) @@ -1085,25 +1061,23 @@ pub async fn get_member_count(pool: &MySqlPool, channel_id: Uuid) -> Result /// Bulk-fetch member counts for a set of channel IDs. /// -/// Returns a map of `channel_id → count`. Channels with zero members are omitted. +/// Returns a map of `channel_id -> count`. Channels with zero members are omitted. /// Single query regardless of input size. pub async fn get_member_counts_bulk( - pool: &MySqlPool, + pool: &PgPool, channel_ids: &[Uuid], ) -> Result> { - use crate::event::uuid_from_bytes; - if channel_ids.is_empty() { return Ok(std::collections::HashMap::new()); } - let mut qb: sqlx::QueryBuilder = sqlx::QueryBuilder::new( + let mut qb: sqlx::QueryBuilder = sqlx::QueryBuilder::new( "SELECT channel_id, COUNT(*) as cnt FROM channel_members \ WHERE removed_at IS NULL AND channel_id IN (", ); let mut sep = qb.separated(", "); for id in channel_ids { - sep.push_bind(id.as_bytes().to_vec()); + sep.push_bind(*id); } qb.push(") GROUP BY channel_id"); @@ -1111,8 +1085,7 @@ pub async fn get_member_counts_bulk( let mut map = std::collections::HashMap::with_capacity(rows.len()); for row in rows { - let id_bytes: Vec = row.try_get("channel_id")?; - let id = uuid_from_bytes(&id_bytes)?; + let id: Uuid = row.try_get("channel_id")?; let cnt: i64 = row.try_get("cnt")?; map.insert(id, cnt); } @@ -1123,17 +1096,16 @@ pub async fn get_member_counts_bulk( /// /// Returns `None` if the pubkey is not an active member. pub async fn get_member_role( - pool: &MySqlPool, + pool: &PgPool, channel_id: Uuid, pubkey: &[u8], ) -> Result> { - let channel_id_bytes = channel_id.as_bytes().as_slice().to_vec(); let row = sqlx::query( - "SELECT cm.role FROM channel_members cm \ + "SELECT cm.role::text AS role FROM channel_members cm \ JOIN channels c ON cm.channel_id = c.id AND c.deleted_at IS NULL \ - WHERE cm.channel_id = ? AND cm.pubkey = ? AND cm.removed_at IS NULL", + WHERE cm.channel_id = $1 AND cm.pubkey = $2 AND cm.removed_at IS NULL", ) - .bind(&channel_id_bytes) + .bind(channel_id) .bind(pubkey) .fetch_optional(pool) .await?; diff --git a/crates/sprout-db/src/dm.rs b/crates/sprout-db/src/dm.rs index cb56d5c..37ad6b1 100644 --- a/crates/sprout-db/src/dm.rs +++ b/crates/sprout-db/src/dm.rs @@ -1,18 +1,17 @@ //! Direct message channel persistence. //! //! DMs are channels with channel_type='dm' and visibility='private'. -//! Participant sets are immutable — adding a member creates a NEW DM. +//! Participant sets are immutable -- adding a member creates a NEW DM. use chrono::{DateTime, Utc}; use sha2::{Digest, Sha256}; -use sqlx::{MySqlPool, Row}; +use sqlx::{PgPool, Row}; use uuid::Uuid; use crate::channel::ChannelRecord; use crate::error::{DbError, Result}; -use crate::event::uuid_from_bytes; -// ── Public structs ──────────────────────────────────────────────────────────── +// -- Public structs ----------------------------------------------------------- /// A DM conversation with its participant list. #[derive(Debug, Clone)] @@ -38,7 +37,7 @@ pub struct DmParticipant { pub role: String, } -// ── Pure helpers ────────────────────────────────────────────────────────────── +// -- Pure helpers ------------------------------------------------------------- /// Compute a stable SHA-256 fingerprint for a set of participant pubkeys. /// @@ -57,24 +56,25 @@ pub fn compute_participant_hash(pubkeys: &[&[u8]]) -> [u8; 32] { hasher.finalize().into() } -// ── DB functions ────────────────────────────────────────────────────────────── +// -- DB functions ------------------------------------------------------------- /// Find an existing DM by its participant hash. /// /// Returns `None` if no matching DM exists or if it has been deleted. pub async fn find_dm_by_participants( - pool: &MySqlPool, + pool: &PgPool, participant_hash: &[u8], ) -> Result> { let row = sqlx::query( r#" - SELECT id, name, channel_type, visibility, description, canvas, + SELECT id, name, channel_type::text AS channel_type, visibility::text AS visibility, + description, canvas, created_by, created_at, updated_at, archived_at, deleted_at, nip29_group_id, topic_required, max_members, topic, topic_set_by, topic_set_at, purpose, purpose_set_by, purpose_set_at FROM channels - WHERE participant_hash = ? + WHERE participant_hash = $1 AND channel_type = 'dm' AND deleted_at IS NULL LIMIT 1 @@ -91,11 +91,11 @@ pub async fn find_dm_by_participants( /// existing one if a DM with the same participant set already exists. /// /// Rules: -/// - `participants` must contain 2–9 entries (enforced here). +/// - `participants` must contain 2-9 entries (enforced here). /// - `created_by` must be one of the participants. -/// - The operation is idempotent: same participant set → same channel returned. +/// - The operation is idempotent: same participant set -> same channel returned. pub async fn create_dm( - pool: &MySqlPool, + pool: &PgPool, participants: &[&[u8]], created_by: &[u8], ) -> Result { @@ -125,13 +125,14 @@ pub async fn create_dm( // Idempotency check inside the transaction. let existing = sqlx::query( r#" - SELECT id, name, channel_type, visibility, description, canvas, + SELECT id, name, channel_type::text AS channel_type, visibility::text AS visibility, + description, canvas, created_by, created_at, updated_at, archived_at, deleted_at, nip29_group_id, topic_required, max_members, topic, topic_set_by, topic_set_at, purpose, purpose_set_by, purpose_set_at FROM channels - WHERE participant_hash = ? + WHERE participant_hash = $1 AND channel_type = 'dm' AND deleted_at IS NULL LIMIT 1 @@ -154,16 +155,15 @@ pub async fn create_dm( }; let id = Uuid::new_v4(); - let id_bytes = id.as_bytes().as_slice().to_vec(); sqlx::query( r#" INSERT INTO channels (id, name, channel_type, visibility, created_by, participant_hash) - VALUES (?, ?, 'dm', 'private', ?, ?) + VALUES ($1, $2, 'dm', 'private', $3, $4) "#, ) - .bind(&id_bytes) + .bind(id) .bind(&name) .bind(created_by) .bind(hash.as_slice()) @@ -175,14 +175,14 @@ pub async fn create_dm( sqlx::query( r#" INSERT INTO channel_members (channel_id, pubkey, role, invited_by) - VALUES (?, ?, 'member', ?) - ON DUPLICATE KEY UPDATE + VALUES ($1, $2, 'member', $3) + ON CONFLICT (channel_id, pubkey) DO UPDATE SET removed_at = NULL, removed_by = NULL, - role = VALUES(role) + role = EXCLUDED.role "#, ) - .bind(&id_bytes) + .bind(id) .bind(*pk) .bind(created_by) .execute(&mut *tx) @@ -191,15 +191,16 @@ pub async fn create_dm( let row = sqlx::query( r#" - SELECT id, name, channel_type, visibility, description, canvas, + SELECT id, name, channel_type::text AS channel_type, visibility::text AS visibility, + description, canvas, created_by, created_at, updated_at, archived_at, deleted_at, nip29_group_id, topic_required, max_members, topic, topic_set_by, topic_set_at, purpose, purpose_set_by, purpose_set_at - FROM channels WHERE id = ? + FROM channels WHERE id = $1 "#, ) - .bind(&id_bytes) + .bind(id) .fetch_one(&mut *tx) .await?; @@ -213,7 +214,7 @@ pub async fn create_dm( /// Includes participant details for each DM. Supports cursor-based pagination /// using `updated_at` ordering. pub async fn list_dms_for_user( - pool: &MySqlPool, + pool: &PgPool, pubkey: &[u8], limit: u32, cursor: Option, @@ -222,9 +223,8 @@ pub async fn list_dms_for_user( // Resolve cursor to a timestamp for keyset pagination. let cursor_ts: Option> = if let Some(cid) = cursor { - let cid_bytes = cid.as_bytes().as_slice().to_vec(); - let row = sqlx::query("SELECT updated_at FROM channels WHERE id = ?") - .bind(&cid_bytes) + let row = sqlx::query("SELECT updated_at FROM channels WHERE id = $1") + .bind(cid) .fetch_optional(pool) .await?; row.map(|r| r.try_get::, _>("updated_at")) @@ -241,13 +241,13 @@ pub async fn list_dms_for_user( FROM channels c JOIN channel_members cm ON c.id = cm.channel_id - AND cm.pubkey = ? + AND cm.pubkey = $1 AND cm.removed_at IS NULL WHERE c.channel_type = 'dm' AND c.deleted_at IS NULL - AND c.updated_at < ? + AND c.updated_at < $2 ORDER BY c.updated_at DESC - LIMIT ? + LIMIT $3 "#, ) .bind(pubkey) @@ -262,12 +262,12 @@ pub async fn list_dms_for_user( FROM channels c JOIN channel_members cm ON c.id = cm.channel_id - AND cm.pubkey = ? + AND cm.pubkey = $1 AND cm.removed_at IS NULL WHERE c.channel_type = 'dm' AND c.deleted_at IS NULL ORDER BY c.updated_at DESC - LIMIT ? + LIMIT $2 "#, ) .bind(pubkey) @@ -279,23 +279,22 @@ pub async fn list_dms_for_user( let mut results = Vec::with_capacity(channel_rows.len()); for row in channel_rows { - let id_bytes: Vec = row.try_get("id")?; - let channel_id = uuid_from_bytes(&id_bytes)?; + let channel_id: Uuid = row.try_get("id")?; let created_at: DateTime = row.try_get("created_at")?; let updated_at: DateTime = row.try_get("updated_at")?; // Fetch participants for this DM. let member_rows = sqlx::query( r#" - SELECT cm.pubkey, cm.role, u.display_name + SELECT cm.pubkey, cm.role::text AS role, u.display_name FROM channel_members cm LEFT JOIN users u ON cm.pubkey = u.pubkey - WHERE cm.channel_id = ? + WHERE cm.channel_id = $1 AND cm.removed_at IS NULL ORDER BY cm.joined_at ASC "#, ) - .bind(&id_bytes) + .bind(channel_id) .fetch_all(pool) .await?; @@ -327,10 +326,10 @@ pub async fn list_dms_for_user( /// ensuring the caller is always a participant in their own DM. /// /// Returns `(channel, was_created)`: -/// - `was_created = true` — a new DM was created. -/// - `was_created = false` — an existing DM was returned. +/// - `was_created = true` -- a new DM was created. +/// - `was_created = false` -- an existing DM was returned. pub async fn open_dm( - pool: &MySqlPool, + pool: &PgPool, pubkeys: &[&[u8]], created_by: &[u8], ) -> Result<(ChannelRecord, bool)> { @@ -357,23 +356,13 @@ pub async fn open_dm( // Create new DM. let channel = create_dm(pool, &all, created_by).await?; - // Determine if we actually created it by checking the created_at/updated_at delta. - // A simpler approach: re-check the hash. If created_at == updated_at it's brand new. - // But the most reliable signal is whether find_dm returned None above. - // Since create_dm is idempotent (returns existing if race occurred), we check - // whether the channel was just created by comparing created_at ≈ now. - // For simplicity, we return true here — the caller treats it as "just created". - // In the race case (two concurrent open_dm calls), one will get true and one false - // (the second call's create_dm returns the existing record, but we already checked - // above and got None). This is an acceptable edge case for idempotent DM creation. Ok((channel, true)) } -// ── Row mapping ─────────────────────────────────────────────────────────────── +// -- Row mapping -------------------------------------------------------------- -fn row_to_channel_record(row: sqlx::mysql::MySqlRow) -> Result { - let id_bytes: Vec = row.try_get("id")?; - let id = uuid_from_bytes(&id_bytes)?; +fn row_to_channel_record(row: sqlx::postgres::PgRow) -> Result { + let id: Uuid = row.try_get("id")?; let topic_required: bool = row.try_get("topic_required")?; Ok(ChannelRecord { @@ -400,7 +389,7 @@ fn row_to_channel_record(row: sqlx::mysql::MySqlRow) -> Result { }) } -// ── Tests ───────────────────────────────────────────────────────────────────── +// -- Tests -------------------------------------------------------------------- #[cfg(test)] mod tests { diff --git a/crates/sprout-db/src/error.rs b/crates/sprout-db/src/error.rs index f8b8a2e..58ee344 100644 --- a/crates/sprout-db/src/error.rs +++ b/crates/sprout-db/src/error.rs @@ -9,10 +9,6 @@ pub enum DbError { #[error("database error: {0}")] Sqlx(#[from] sqlx::Error), - /// A SQLx migration error. - #[error("migration error: {0}")] - Migrate(#[from] sqlx::migrate::MigrateError), - /// Attempted to store an AUTH event (kind 22242), which is forbidden. #[error("AUTH events (kind 22242) must not be stored")] AuthEventRejected, diff --git a/crates/sprout-db/src/event.rs b/crates/sprout-db/src/event.rs index 0454d69..b7cf9b1 100644 --- a/crates/sprout-db/src/event.rs +++ b/crates/sprout-db/src/event.rs @@ -2,11 +2,11 @@ //! //! AUTH events (kind 22242) are never stored — they carry bearer tokens. //! Ephemeral events (kinds 20000–29999) are never stored — Redis pub/sub only. -//! Deduplication is application-layer: INSERT IGNORE. +//! Deduplication is application-layer: ON CONFLICT DO NOTHING. use chrono::{DateTime, Utc}; use nostr::Event; -use sqlx::{MySqlPool, QueryBuilder, Row}; +use sqlx::{PgPool, QueryBuilder, Row}; use uuid::Uuid; use sprout_core::kind::{event_kind_i32, is_ephemeral, KIND_AUTH}; @@ -19,7 +19,7 @@ use crate::error::{DbError, Result}; pub struct EventQuery { /// Restrict results to this channel. pub channel_id: Option, - /// Restrict results to these kind values (stored as `i32` in MySQL). + /// Restrict results to these kind values (stored as `i32` in Postgres). pub kinds: Option>, /// Restrict results to events from this pubkey. pub pubkey: Option>, @@ -40,7 +40,7 @@ pub struct EventQuery { /// /// Returns `(StoredEvent, was_inserted)` — `was_inserted` is `false` on duplicate. pub async fn insert_event( - pool: &MySqlPool, + pool: &PgPool, event: &Event, channel_id: Option, ) -> Result<(StoredEvent, bool)> { @@ -58,18 +58,17 @@ pub async fn insert_event( let pubkey_bytes = event.pubkey.to_bytes(); let sig_bytes = event.sig.serialize(); let tags_json = serde_json::to_value(&event.tags)?; - // Cast chain: nostr Kind (u16) → i32 (MySQL INT column). Safe: all Sprout kinds fit in i32. + // Cast chain: nostr Kind (u16) → i32 (Postgres INT column). Safe: all Sprout kinds fit in i32. let kind_i32 = event_kind_i32(event); let created_at_secs = event.created_at.as_u64() as i64; let created_at = DateTime::from_timestamp(created_at_secs, 0) .ok_or(DbError::InvalidTimestamp(created_at_secs))?; let received_at = Utc::now(); - let channel_id_bytes: Option<[u8; 16]> = channel_id.map(|u| *u.as_bytes()); - let result = sqlx::query( r#" - INSERT IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT DO NOTHING "#, ) .bind(id_bytes.as_slice()) @@ -80,7 +79,7 @@ pub async fn insert_event( .bind(&event.content) .bind(sig_bytes.as_slice()) .bind(received_at) - .bind(channel_id_bytes.as_ref().map(|b| b.as_slice())) + .bind(channel_id) .execute(pool) .await?; @@ -96,7 +95,7 @@ pub async fn insert_event( /// /// Uses `QueryBuilder` for dynamic filter composition — avoids string concatenation /// while keeping all user values in bind parameters. -pub async fn query_events(pool: &MySqlPool, q: &EventQuery) -> Result> { +pub async fn query_events(pool: &PgPool, q: &EventQuery) -> Result> { // kinds:[] means "match no kinds" — return empty immediately. if q.kinds.as_deref().is_some_and(|k| k.is_empty()) { return Ok(vec![]); @@ -105,7 +104,7 @@ pub async fn query_events(pool: &MySqlPool, q: &EventQuery) -> Result = if let Some(ref p_hex) = q.p_tag_hex { + let mut qb: QueryBuilder = if let Some(ref p_hex) = q.p_tag_hex { // Join against event_mentions for #p-filtered queries (indexed). let mut b = QueryBuilder::new( "SELECT e.id, e.pubkey, e.created_at, e.kind, e.tags, e.content, \ @@ -128,7 +127,7 @@ pub async fn query_events(pool: &MySqlPool, q: &EventQuery) -> Result = row.try_get("pubkey")?; let created_at: DateTime = row.try_get("created_at")?; @@ -178,10 +177,9 @@ pub(crate) fn row_to_stored_event(row: sqlx::mysql::MySqlRow) -> Result = row.try_get("sig")?; let received_at: DateTime = row.try_get("received_at")?; - let channel_id_bytes: Option> = row.try_get("channel_id")?; - let channel_id: Option = channel_id_bytes.map(|b| uuid_from_bytes(&b)).transpose()?; + let channel_id: Option = row.try_get("channel_id")?; - // kind is stored as i32 (MySQL INT) but Nostr uses u16. Values > 65535 are corrupt. + // kind is stored as i32 (Postgres INT) but Nostr uses u16. Values > 65535 are corrupt. let kind_u16 = u16::try_from(kind_i32) .map_err(|_| DbError::InvalidData(format!("kind out of u16 range: {kind_i32}")))?; @@ -212,14 +210,14 @@ pub(crate) fn row_to_stored_event(row: sqlx::mysql::MySqlRow) -> Result Result { +pub async fn soft_delete_event(pool: &PgPool, event_id: &[u8]) -> Result { let result = - sqlx::query("UPDATE events SET deleted_at = NOW(6) WHERE id = ? AND deleted_at IS NULL") + sqlx::query("UPDATE events SET deleted_at = NOW() WHERE id = $1 AND deleted_at IS NULL") .bind(event_id) .execute(pool) .await?; @@ -233,7 +231,7 @@ pub async fn soft_delete_event(pool: &MySqlPool, event_id: &[u8]) -> Result, root_event_id: Option<&[u8]>, @@ -241,7 +239,7 @@ pub async fn soft_delete_event_and_update_thread( let mut tx = pool.begin().await?; let result = - sqlx::query("UPDATE events SET deleted_at = NOW(6) WHERE id = ? AND deleted_at IS NULL") + sqlx::query("UPDATE events SET deleted_at = NOW() WHERE id = $1 AND deleted_at IS NULL") .bind(event_id) .execute(&mut *tx) .await?; @@ -253,7 +251,7 @@ pub async fn soft_delete_event_and_update_thread( sqlx::query( "UPDATE thread_metadata \ SET reply_count = GREATEST(reply_count - 1, 0) \ - WHERE event_id = ?", + WHERE event_id = $1", ) .bind(pid) .execute(&mut *tx) @@ -263,7 +261,7 @@ pub async fn soft_delete_event_and_update_thread( sqlx::query( "UPDATE thread_metadata \ SET descendant_count = GREATEST(descendant_count - 1, 0) \ - WHERE event_id = ?", + WHERE event_id = $1", ) .bind(root_id) .execute(&mut *tx) @@ -278,16 +276,15 @@ pub async fn soft_delete_event_and_update_thread( /// Returns the `created_at` timestamp of the most recent non-deleted event in a channel. pub async fn get_last_message_at( - pool: &MySqlPool, + pool: &PgPool, channel_id: uuid::Uuid, ) -> Result>> { - let id_bytes = channel_id.as_bytes().as_slice().to_vec(); let row = sqlx::query( "SELECT created_at FROM events \ - WHERE channel_id = ? AND deleted_at IS NULL \ + WHERE channel_id = $1 AND deleted_at IS NULL \ ORDER BY created_at DESC LIMIT 1", ) - .bind(&id_bytes) + .bind(channel_id) .fetch_optional(pool) .await?; @@ -302,20 +299,20 @@ pub async fn get_last_message_at( /// Returns a map of `channel_id → last_message_at`. Channels with no events are omitted. /// Single query regardless of input size. pub async fn get_last_message_at_bulk( - pool: &MySqlPool, + pool: &PgPool, channel_ids: &[uuid::Uuid], ) -> Result>> { if channel_ids.is_empty() { return Ok(std::collections::HashMap::new()); } - let mut qb: QueryBuilder = QueryBuilder::new( + let mut qb: QueryBuilder = QueryBuilder::new( "SELECT channel_id, MAX(created_at) as last_at FROM events \ WHERE deleted_at IS NULL AND channel_id IN (", ); let mut sep = qb.separated(", "); for id in channel_ids { - sep.push_bind(id.as_bytes().to_vec()); + sep.push_bind(*id); } qb.push(") GROUP BY channel_id"); @@ -323,8 +320,7 @@ pub async fn get_last_message_at_bulk( let mut map = std::collections::HashMap::with_capacity(rows.len()); for row in rows { - let id_bytes: Vec = row.try_get("channel_id")?; - let id = uuid_from_bytes(&id_bytes)?; + let id: Uuid = row.try_get("channel_id")?; let last_at: DateTime = row.try_get("last_at")?; map.insert(id, last_at); } @@ -336,10 +332,10 @@ pub async fn get_last_message_at_bulk( /// Returns `None` if the event does not exist or has been soft-deleted. /// Use [`get_event_by_id_including_deleted`] when you need to inspect /// tombstoned rows (e.g. audit, undelete). -pub async fn get_event_by_id(pool: &MySqlPool, id_bytes: &[u8]) -> Result> { +pub async fn get_event_by_id(pool: &PgPool, id_bytes: &[u8]) -> Result> { let row = sqlx::query( "SELECT id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id \ - FROM events WHERE id = ? AND deleted_at IS NULL ORDER BY created_at DESC LIMIT 1", + FROM events WHERE id = $1 AND deleted_at IS NULL ORDER BY created_at DESC LIMIT 1", ) .bind(id_bytes) .fetch_optional(pool) @@ -357,12 +353,12 @@ pub async fn get_event_by_id(pool: &MySqlPool, id_bytes: &[u8]) -> Result