From a95f12566041c39f2b1f96cbe95d98043e682433 Mon Sep 17 00:00:00 2001 From: Ayush Jain <76424614+ayushjain17@users.noreply.github.com> Date: Tue, 14 Apr 2026 16:27:37 +0530 Subject: [PATCH] feat: Add workspace locking middleware using PostgreSQL advisory locks This commit introduces a new middleware that serializes all write operations (POST, PUT, DELETE, PATCH) per workspace using PostgreSQL advisory locks. Changes: - Created WorkspaceLockMiddleware that: - Extracts org_id and workspace_id from requests - Computes a unique lock key using hash of org_id:workspace_id - Acquires PostgreSQL advisory lock before processing write operations - Ensures lock is released after request completion - Skips locking for read operations (GET, etc.) - Registered the middleware on all workspace-scoped endpoints: /context, /dimension, /default-config, /config, /audit, /function, /types, /experiments, /experiment-groups, /webhook, /variables, /resolve, /auth This ensures write operations to the same workspace are serialized, preventing race conditions and maintaining data consistency. refactor: Use two-argument pg_advisory_lock for better lock space utilization Changed from single-argument pg_advisory_lock(bigint) to two-argument pg_advisory_lock(int, int) form for workspace locking. Benefits: - More natural mapping: org_id and workspace_id get separate hash spaces - Better lock space utilization: each ID gets full 32-bit space - Lower collision probability: separate hashing reduces conflicts - Easier debugging: both components visible in pg_locks table Implementation: - compute_lock_keys() now returns (i32, i32) tuple - org_id and workspace_id are hashed independently - Updated acquire/release functions to use two-argument SQL - Enhanced tests to verify component separation feat: Add retry logic with exponential backoff for workspace locks Changed from blocking pg_advisory_lock() to non-blocking pg_try_advisory_lock() with intelligent retry logic to prevent indefinite request blocking. **Previous Behavior:** - pg_advisory_lock() blocks indefinitely until lock is available - Requests could hang for extended periods during high contention - No visibility into lock acquisition delays - Risk of cascading timeouts **New Behavior:** - pg_try_advisory_lock() returns immediately with success/failure - Exponential backoff retry: 10ms, 20ms, 40ms, 80ms... up to 500ms max - Maximum 10 attempts (total ~5 seconds max wait) - Clear error message after exhausting retries - Logs retry attempts for observability **Retry Configuration:** - MAX_RETRIES: 10 attempts - INITIAL_BACKOFF_MS: 10ms - MAX_BACKOFF_MS: 500ms (cap to prevent excessive delays) **Benefits:** - Predictable maximum wait time (~5 seconds) - Better user experience with faster failures - Reduced risk of cascading timeouts - Visibility into lock contention via logs - Graceful degradation under high load fix: Replace blocking sleep with async sleep in lock retry logic Fixed critical async/blocking issues flagged by code review: **Issue 1: Blocking sleep in async context** - Changed std::thread::sleep() to actix_web::rt::time::sleep().await - Using blocking sleep in async middleware would block the entire worker thread - This prevented other requests from being processed on that thread - Now properly yields control back to the async executor during backoff **Issue 2: Made acquire_advisory_lock async** - Function signature changed from sync to async - Properly propagates async behavior through the call chain - Maintains non-blocking execution throughout retry attempts **Impact:** - Before: Worker threads would be blocked during lock retry delays - After: Worker threads can process other requests while waiting - Much better concurrency and throughput under lock contention fix: Add RAII guard to ensure lock release even on panic Implemented AdvisoryLockGuard using RAII pattern to guarantee lock release in all code paths, including when handlers panic. **Problem:** Previous implementation would skip lock release if the handler panicked: ```rust acquire_lock() handler() // <-- If this panics... release_lock() // <-- ...this never runs! ``` This would leave locks held until DB connection closes, potentially causing deadlocks or severe contention. **Solution:** Created AdvisoryLockGuard struct that implements Drop: ```rust struct AdvisoryLockGuard<'a> { conn: &'a mut PgConnection, org_key: i32, workspace_key: i32, } impl Drop for AdvisoryLockGuard<'_> { fn drop(&mut self) { // Always releases lock, even on panic release_advisory_lock(...) } } ``` **How it works:** 1. Acquire lock 2. Create guard (holds mutable reference to connection) 3. Call handler 4. Guard is automatically dropped when scope ends - On normal return: guard drops, lock released - On panic: guard drops during unwinding, lock released - On early return: guard drops, lock released **Benefits:** - Guaranteed lock cleanup in all code paths - Panic-safe resource management - Prevents lock leaks that could cause deadlocks - Follows Rust RAII best practices fix: moved intialization of lock guard to acquire call --- crates/service_utils/src/middlewares.rs | 1 + .../src/middlewares/workspace_lock.rs | 363 +++++++++++ crates/superposition/src/main.rs | 5 + tests/src/concurrency.test.ts | 576 ++++++++++++++++++ 4 files changed, 945 insertions(+) create mode 100644 crates/service_utils/src/middlewares/workspace_lock.rs create mode 100644 tests/src/concurrency.test.ts diff --git a/crates/service_utils/src/middlewares.rs b/crates/service_utils/src/middlewares.rs index c7ba3e8e4..5a78ff748 100644 --- a/crates/service_utils/src/middlewares.rs +++ b/crates/service_utils/src/middlewares.rs @@ -2,3 +2,4 @@ pub mod auth_n; pub mod auth_z; pub mod request_response_logging; pub mod workspace_context; +pub mod workspace_lock; diff --git a/crates/service_utils/src/middlewares/workspace_lock.rs b/crates/service_utils/src/middlewares/workspace_lock.rs new file mode 100644 index 000000000..569410004 --- /dev/null +++ b/crates/service_utils/src/middlewares/workspace_lock.rs @@ -0,0 +1,363 @@ +use std::future::{Ready, ready}; +use std::rc::Rc; + +use actix_web::{ + Error, + body::EitherBody, + dev::{Service, ServiceRequest, ServiceResponse, Transform, forward_ready}, + error, + http::Method, + web::Data, +}; +use diesel::prelude::*; +use futures_util::future::LocalBoxFuture; +use superposition_types::DBConnection; + +use crate::{extensions::HttpRequestExt, service::types::AppState}; + +/// Middleware factory for workspace locking using PostgreSQL advisory locks. +/// This ensures all write operations (POST, PUT, DELETE, PATCH) are serialized per workspace. +pub struct WorkspaceLockMiddlewareFactory; + +impl WorkspaceLockMiddlewareFactory { + pub fn new() -> Self { + Self + } +} + +impl Default for WorkspaceLockMiddlewareFactory { + fn default() -> Self { + Self::new() + } +} + +impl Transform for WorkspaceLockMiddlewareFactory +where + S: Service, Error = Error> + 'static, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse>; + type Error = Error; + type InitError = (); + type Transform = WorkspaceLockMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(WorkspaceLockMiddleware { + service: Rc::new(service), + })) + } +} + +pub struct WorkspaceLockMiddleware { + service: Rc, +} + +impl Service for WorkspaceLockMiddleware +where + S: Service, Error = Error> + 'static, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse>; + type Error = Error; + type Future = LocalBoxFuture<'static, Result>; + + forward_ready!(service); + + fn call(&self, req: ServiceRequest) -> Self::Future { + let srv = self.service.clone(); + + Box::pin(async move { + // Only lock for write operations + let is_write_operation = match *req.method() { + Method::PUT | Method::DELETE | Method::PATCH => true, + Method::POST if is_read_only_post(req.path()) => false, + Method::POST => true, + _ => false, + }; + + if !is_write_operation { + // For read operations, skip locking and proceed directly + let res = srv.call(req).await?.map_into_left_body(); + return Ok(res); + } + + // Extract workspace and org IDs + let workspace_id = req.request().get_workspace_id(); + let org_id = req.request().get_organisation_id(); + + // If we don't have both IDs, we can't lock - proceed without locking + // The OrgWorkspaceMiddleware will handle the validation + let lock_key = match (org_id, workspace_id) { + (Some(org), Some(workspace)) => { + Some(compute_lock_key(&org.0, &workspace.0)) + } + _ => None, + }; + + // Get database connection from app state + let app_state = match req.app_data::>() { + Some(val) => val, + None => { + log::error!("app state not set in workspace lock middleware"); + return Err(error::ErrorInternalServerError("")); + } + }; + + // Acquire advisory lock if we have a lock key. + // The guard takes ownership of a dedicated pooled connection so that + // no `&mut` borrow is held across the `.await` of the inner service. + let _lock_guard = if let Some(lock_key) = lock_key { + let db_conn = match app_state.db_pool.get() { + Ok(conn) => conn, + Err(e) => { + log::error!( + "failed to get database connection for workspace lock: {}", + e + ); + return Err(error::ErrorInternalServerError( + "Failed to acquire database connection", + )); + } + }; + match acquire_advisory_lock(db_conn, lock_key).await { + Ok(guard) => { + log::debug!( + "acquired advisory lock for workspace (lock_key: {})", + lock_key + ); + Some(guard) + } + Err(e) => { + log::error!( + "failed to acquire advisory lock for lock_key: {}: {}", + lock_key, + e + ); + return Err(error::ErrorInternalServerError( + "Failed to acquire workspace lock", + )); + } + } + } else { + None + }; + + // Call the actual handler + // The lock guard will automatically release the lock when dropped, + // even if the handler panics + let result = srv.call(req).await; + + // Guard is dropped here, ensuring lock is always released + result.map(|r| r.map_into_left_body()) + }) + } +} + +#[inline] +fn is_read_only_post(path: &str) -> bool { + // Match "resolve" as a full path segment — never a bare substring. + // Handles: "/resolve", "/api/resolve", "/resolve/flags", "/v1/resolve/overrides" + // Rejects: "/resolveconfig", "/myresolve" + path.split('/').any(|segment| segment == "resolve") +} + +/// Compute a stable, deterministic 64-bit advisory lock key from org_id and workspace_id. +/// +/// Uses FNV-1a (Fowler–Noll–Vo) which is: +/// - Fully deterministic across all Rust versions and platforms +/// - Free of random seeding (unlike `DefaultHasher` / `SipHash`) +/// - Suitable for use as a stable identifier in distributed deployments +/// +/// The key is derived from the combined string ":" so that +/// different (org, workspace) pairs always produce different keys. +fn compute_lock_key(org_id: &str, workspace_id: &str) -> i64 { + // FNV-1a constants for 64-bit variant + const FNV_OFFSET_BASIS: u64 = 14695981039346656037; + const FNV_PRIME: u64 = 1099511628211; + + let mut hash = FNV_OFFSET_BASIS; + // Combine the two IDs with a separator that cannot appear in either ID + for byte in org_id + .bytes() + .chain(b":".iter().copied()) + .chain(workspace_id.bytes()) + { + hash ^= byte as u64; + hash = hash.wrapping_mul(FNV_PRIME); + } + // Reinterpret the u64 bits as i64 for PostgreSQL's bigint parameter + hash as i64 +} + +/// Error type returned by [`acquire_advisory_lock`]. +/// +/// Separating "the SQL call failed" from "we gave up after too many retries" +/// avoids synthesising a fake `diesel::result::Error::DatabaseError`. +#[derive(Debug)] +enum AcquireLockError { + Diesel(diesel::result::Error), + MaxRetriesExceeded(u32), +} + +impl std::fmt::Display for AcquireLockError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Diesel(e) => write!(f, "database error: {}", e), + Self::MaxRetriesExceeded(attempts) => write!( + f, + "failed to acquire workspace lock after {} attempts (high contention)", + attempts + ), + } + } +} + +impl From for AcquireLockError { + fn from(e: diesel::result::Error) -> Self { + Self::Diesel(e) + } +} + +/// Acquire a PostgreSQL session-level advisory lock using the single-bigint form. +/// +/// Takes **ownership** of the pooled connection so that the returned +/// [`AdvisoryLockGuard`] can live across `.await` points without holding a +/// `&mut` borrow (which would create a self-referential future). +/// +/// Uses `pg_try_advisory_lock(bigint)` (non-blocking) with exponential backoff. +async fn acquire_advisory_lock( + mut conn: DBConnection, + lock_key: i64, +) -> Result { + const MAX_RETRIES: u32 = 15; + const INITIAL_BACKOFF_MS: u64 = 10; + const MAX_BACKOFF_MS: u64 = 1000; + + for attempt in 0..MAX_RETRIES { + // Try to acquire the lock (non-blocking) + let lock_acquired: bool = + diesel::sql_query("SELECT pg_try_advisory_lock($1) as locked") + .bind::(lock_key) + .get_result::(&mut *conn)? + .locked; + + if lock_acquired { + if attempt > 0 { + log::info!( + "acquired advisory lock after {} attempts (lock_key: {})", + attempt + 1, + lock_key + ); + } + return Ok(AdvisoryLockGuard { conn, lock_key }); + } + + // Lock not acquired, wait before retrying + if attempt < MAX_RETRIES - 1 { + let backoff_ms = + std::cmp::min(INITIAL_BACKOFF_MS * 2_u64.pow(attempt), MAX_BACKOFF_MS); + log::debug!( + "lock contention detected, retrying in {}ms (attempt {}/{}, lock_key: {})", + backoff_ms, + attempt + 1, + MAX_RETRIES, + lock_key + ); + actix_web::rt::time::sleep(std::time::Duration::from_millis(backoff_ms)) + .await; + } + } + + Err(AcquireLockError::MaxRetriesExceeded(MAX_RETRIES)) +} + +// Helper struct for deserializing pg_try_advisory_lock result +#[derive(QueryableByName)] +struct LockResult { + #[diesel(sql_type = diesel::sql_types::Bool)] + locked: bool, +} + +/// RAII guard that ensures the advisory lock is released when dropped. +/// +/// **Owns** the pooled `DBConnection` so that: +/// 1. No `&mut` borrow is held across `.await` in the calling async block. +/// 2. If the guard is dropped (including during panic unwind), the unlock +/// query runs on the same session that acquired the lock. +/// 3. Even if the unlock query fails, dropping the owned connection returns +/// it to the pool, and the session-level lock is released when PostgreSQL +/// recycles the backend. +struct AdvisoryLockGuard { + conn: DBConnection, + lock_key: i64, +} + +impl Drop for AdvisoryLockGuard { + fn drop(&mut self) { + let result = diesel::sql_query("SELECT pg_advisory_unlock($1)") + .bind::(self.lock_key) + .execute(&mut *self.conn); + + match result { + Ok(_) => log::debug!( + "released advisory lock via guard (lock_key: {})", + self.lock_key + ), + Err(e) => log::error!( + "failed to release advisory lock in drop guard (lock_key: {}): {}", + self.lock_key, + e + ), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compute_lock_key_consistency() { + let key1 = compute_lock_key("org1", "workspace1"); + let key2 = compute_lock_key("org1", "workspace1"); + assert_eq!( + key1, key2, + "Same inputs must always produce the same lock key" + ); + } + + #[test] + fn test_compute_lock_key_uniqueness() { + let key1 = compute_lock_key("org1", "workspace1"); + let key2 = compute_lock_key("org1", "workspace2"); + let key3 = compute_lock_key("org2", "workspace1"); + let key4 = compute_lock_key("org2", "workspace2"); + + assert_ne!( + key1, key2, + "Different workspaces should produce different keys" + ); + assert_ne!(key1, key3, "Different orgs should produce different keys"); + assert_ne!( + key1, key4, + "Different combinations should produce different keys" + ); + assert_ne!(key2, key3, "Different org/workspace combos should differ"); + assert_ne!(key3, key4, "Different org/workspace combos should differ"); + } + + #[test] + fn test_compute_lock_key_separator_prevents_ambiguity() { + // "org1:" + "workspace" must differ from "org" + ":1workspace" + // i.e., swapping characters across the conceptual boundary must not collide + let key1 = compute_lock_key("org1", "workspace"); + let key2 = compute_lock_key("org", "1workspace"); + assert_ne!( + key1, key2, + "Keys derived from ambiguous splits of the same byte sequence must differ" + ); + } +} diff --git a/crates/superposition/src/main.rs b/crates/superposition/src/main.rs index 44d32f407..fa033f0ce 100644 --- a/crates/superposition/src/main.rs +++ b/crates/superposition/src/main.rs @@ -30,6 +30,7 @@ use service_utils::{ auth_z::{AuthZHandler, AuthZManager, is_auth_z_enabled}, request_response_logging::RequestResponseLogger, workspace_context::OrgWorkspaceMiddlewareFactory, + workspace_lock::WorkspaceLockMiddlewareFactory, }, service::types::AppEnv, }; @@ -238,21 +239,25 @@ impl ScopeExt for Scope { fn resource_routes_workspace_specific(self, auth_z_manager: AuthZManager) -> Self { self.service( scope("/context") + .wrap(WorkspaceLockMiddlewareFactory::new()) .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) .service(context::endpoints()), ) .service( scope("/dimension") + .wrap(WorkspaceLockMiddlewareFactory::new()) .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) .service(dimension::endpoints()), ) .service( scope("/default-config") + .wrap(WorkspaceLockMiddlewareFactory::new()) .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) .service(default_config::endpoints()), ) .service( scope("/config") + .wrap(WorkspaceLockMiddlewareFactory::new()) .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) .service(config::endpoints()), ) diff --git a/tests/src/concurrency.test.ts b/tests/src/concurrency.test.ts new file mode 100644 index 000000000..0fbf5350d --- /dev/null +++ b/tests/src/concurrency.test.ts @@ -0,0 +1,576 @@ +/** + * Concurrency integration tests for the workspace advisory lock middleware. + * + * These tests fire concurrent write requests at the Superposition API and + * verify the advisory lock prevents TOCTOU (time-of-check-time-of-use) + * race conditions that cannot be caught by PostgreSQL row-level locks alone. + * + * Requires: a running Superposition server at http://127.0.0.1:8080 + * + * Run with: + * cd tests && bun test src/concurrency.test.ts + */ + +import { + CreateContextCommand, + CreateDefaultConfigCommand, + CreateDimensionCommand, + DeleteContextCommand, + DeleteDefaultConfigCommand, + DeleteDimensionCommand, + GetContextCommand, + ListDefaultConfigsCommand +} from "@juspay/superposition-sdk"; +import { + afterAll, + beforeAll, + describe, + expect, + setDefaultTimeout, + test, +} from "bun:test"; +import { ENV, superpositionClient } from "../env.ts"; + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +const client = superpositionClient; +const suffix = Math.random().toString(36).substring(2, 8); + +function orgId() { + return ENV.org_id; +} +function wsId() { + return ENV.workspace_id; +} + +/** + * Settle an array of promises and split into fulfilled / rejected. + */ +async function settleAll( + promises: Promise[] +): Promise<{ fulfilled: T[]; rejected: any[] }> { + const results = await Promise.allSettled(promises); + const fulfilled: T[] = []; + const rejected: any[] = []; + for (const r of results) { + if (r.status === "fulfilled") fulfilled.push(r.value); + else rejected.push(r.reason); + } + return { fulfilled, rejected }; +} + +// ─── Resource tracking for cleanup ────────────────────────────────────────── + +const createdDimensions: string[] = []; +const createdDefaultConfigs: string[] = []; +const createdContextIds: Set = new Set(); + +function trackContext(id: string | undefined) { + if (id) createdContextIds.add(id); +} + +// ─── Test Suite ─────────────────────────────────────────────────────────────── + +describe("Concurrency: Advisory Lock Integration Tests", () => { + setDefaultTimeout(120_000); + + // -- Setup ---------------------------------------------------------------- + // Create a dimension ("concDim") and two default configs ("concKey1", + // "concKey2") that all the concurrency tests will share. + + beforeAll(async () => { + console.log( + `[concurrency] org=${orgId()} workspace=${wsId()} suffix=${suffix}` + ); + + // Dimension + try { + await client.send( + new CreateDimensionCommand({ + workspace_id: wsId(), + org_id: orgId(), + dimension: `concDim_${suffix}`, + schema: { type: "string" }, + position: 1, + description: "Dimension for concurrency tests", + change_reason: "concurrency test setup", + }) + ); + createdDimensions.push(`concDim_${suffix}`); + } catch (e: any) { + if (!e.message?.includes("duplicate key")) throw e; + console.log(`concDim_${suffix} already exists`); + createdDimensions.push(`concDim_${suffix}`); + } + + // Default configs + for (const key of [`concKey1_${suffix}`, `concKey2_${suffix}`]) { + try { + await client.send( + new CreateDefaultConfigCommand({ + workspace_id: wsId(), + org_id: orgId(), + key, + schema: { type: "string" }, + value: `default_${key}`, + description: `Default config for concurrency tests`, + change_reason: "concurrency test setup", + }) + ); + createdDefaultConfigs.push(key); + } catch (e: any) { + if (!e.message?.includes("duplicate key")) throw e; + console.log(`${key} already exists`); + createdDefaultConfigs.push(key); + } + } + }); + + // -- Cleanup -------------------------------------------------------------- + + afterAll(async () => { + console.log("[concurrency] Cleaning up..."); + + // Delete all tracked contexts first + for (const id of createdContextIds) { + try { + await client.send( + new DeleteContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + id, + }) + ); + } catch (_) {} + } + + // Delete default configs + for (const key of createdDefaultConfigs) { + try { + await client.send( + new DeleteDefaultConfigCommand({ + workspace_id: wsId(), + org_id: orgId(), + key, + }) + ); + } catch (_) {} + } + + // Delete dimensions (reverse order) + for (let i = createdDimensions.length - 1; i >= 0; i--) { + try { + await client.send( + new DeleteDimensionCommand({ + workspace_id: wsId(), + org_id: orgId(), + dimension: createdDimensions[i], + }) + ); + } catch (_) {} + } + }); + + // ───────────────────────────────────────────────────────────────────────── + // TEST 1: Delete-default-config + create-context TOCTOU race + // + // Without the advisory lock, this interleaving is possible: + // + // Req A (delete): checks "no contexts reference key" → 0 found ✓ + // Req B (create): inserts context referencing key ✓ + // Req A (delete): deletes default config row ✓ ← orphan! + // + // With the lock, requests are serialized per workspace so either: + // • A completes first → B fails ("config key doesn't exist") + // • B completes first → A fails ("key in use by contexts") + // + // In BOTH valid outcomes the invariant holds: + // "a context's override keys are always backed by a default config" + // ───────────────────────────────────────────────────────────────────────── + describe("TOCTOU: delete-config vs create-context", () => { + const ROUNDS = 10; + + for (let round = 0; round < ROUNDS; round++) { + test(`round ${round + 1}: no orphaned context after concurrent delete+create`, async () => { + const configKey = `toctouKey_${suffix}_r${round}`; + const dimValue = `val_r${round}`; + + // ---- setup: create a fresh default config for this round ---- + await client.send( + new CreateDefaultConfigCommand({ + workspace_id: wsId(), + org_id: orgId(), + key: configKey, + schema: { type: "string" }, + value: "default", + description: "TOCTOU race test", + change_reason: "concurrency test", + }) + ); + + // ---- fire both requests concurrently ---- + const deletePromise = client + .send( + new DeleteDefaultConfigCommand({ + workspace_id: wsId(), + org_id: orgId(), + key: configKey, + }) + ) + .then(() => "deleted" as const) + .catch((e: any) => ({ error: "delete" as const, detail: e })); + + const createPromise = client + .send( + new CreateContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + request: { + context: { + [`concDim_${suffix}`]: dimValue, + }, + override: { + [configKey]: `override_${round}`, + }, + description: "TOCTOU race context", + change_reason: "concurrency test", + }, + }) + ) + .then((res: any) => { + trackContext(res.id); + return { created: true, id: res.id } as const; + }) + .catch((e: any) => ({ error: "create" as const, detail: e })); + + const [deleteResult, createResult] = await Promise.all([ + deletePromise, + createPromise, + ]); + + // ---- verify the invariant ---- + // Check if the default config still exists + let configExists = false; + try { + const listResp = await client.send( + new ListDefaultConfigsCommand({ + workspace_id: wsId(), + org_id: orgId(), + }) + ); + configExists = + listResp.data?.some((c: any) => c.key === configKey) ?? + false; + } catch (_) { + configExists = false; + } + + // Check if the context exists (by trying to fetch it) + let contextExists = false; + if ( + typeof createResult === "object" && + "created" in createResult && + createResult.id + ) { + try { + const getResp = await client.send( + new GetContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + id: createResult.id, + }) + ); + contextExists = getResp.$metadata.httpStatusCode === 200; + } catch (_) { + contextExists = false; + } + } + + // THE INVARIANT: + // It must NEVER be the case that the context exists + // but its backing default config does not. + if (contextExists) { + expect(configExists).toBe(true); + console.log( + ` round ${round + 1}: context won (created first, delete was blocked)` + ); + + // Cleanup: delete context, then config + if ( + typeof createResult === "object" && + "id" in createResult && + createResult.id + ) { + try { + await client.send( + new DeleteContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + id: createResult.id, + }) + ); + createdContextIds.delete(createResult.id); + } catch (_) {} + } + try { + await client.send( + new DeleteDefaultConfigCommand({ + workspace_id: wsId(), + org_id: orgId(), + key: configKey, + }) + ); + } catch (_) {} + } else { + // Context doesn't exist → config may or may not exist (both are valid) + console.log( + ` round ${round + 1}: delete won (${configExists ? "config still exists" : "config deleted"})` + ); + // Clean up config if it still exists + if (configExists) { + try { + await client.send( + new DeleteDefaultConfigCommand({ + workspace_id: wsId(), + org_id: orgId(), + key: configKey, + }) + ); + } catch (_) {} + } + } + }); + } + }); + + // ───────────────────────────────────────────────────────────────────────── + // TEST 2: Concurrent context creates all succeed without conflicts + // + // Fire N CreateContextCommands simultaneously — each with a distinct + // condition but referencing the same default config key. All should + // succeed (no lost writes, no 500s from version conflicts). + // ───────────────────────────────────────────────────────────────────────── + describe("Concurrent context creates", () => { + const N = 8; + + test(`${N} concurrent context creates should all succeed`, async () => { + const configKey = `concKey1_${suffix}`; + + const promises = Array.from({ length: N }, (_, i) => + client + .send( + new CreateContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + request: { + context: { + [`concDim_${suffix}`]: `concurrent_${suffix}_${i}`, + }, + override: { + [configKey]: `value_${i}`, + }, + description: `Concurrent create #${i}`, + change_reason: "concurrency test", + }, + }) + ) + .then((res: any) => { + trackContext(res.id); + return res; + }) + ); + + const { fulfilled, rejected } = await settleAll(promises); + + console.log( + ` ${fulfilled.length} succeeded, ${rejected.length} failed` + ); + for (const r of rejected) { + console.log(` rejection: ${r?.message ?? r}`); + } + + // ALL should succeed — the advisory lock serializes them so they + // don't trip over each other's config_versions inserts. + expect(fulfilled.length).toBe(N); + expect(rejected.length).toBe(0); + + // Verify all contexts are actually persisted + for (const res of fulfilled) { + const getResp = await client.send( + new GetContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + id: (res as any).id!, + }) + ); + expect(getResp.$metadata.httpStatusCode).toBe(200); + } + }); + }); + + // ───────────────────────────────────────────────────────────────────────── + // TEST 3: Concurrent create + delete of the SAME context + // + // Create a context, then fire delete and update-override simultaneously. + // Either: + // • delete wins → update fails (context not found) + // • update wins → delete may succeed after update completes + // But never: update succeeds on a deleted context. + // ───────────────────────────────────────────────────────────────────────── + describe("Concurrent delete vs update-override", () => { + test("delete and update-override on same context produce consistent state", async () => { + const configKey = `concKey1_${suffix}`; + + // Create a context to race on + const createResp = await client.send( + new CreateContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + request: { + context: { + [`concDim_${suffix}`]: `race_target_${suffix}`, + }, + override: { + [configKey]: "original", + }, + description: "Race target context", + change_reason: "concurrency test", + }, + }) + ); + const ctxId = createResp.id!; + trackContext(ctxId); + + // Fire delete + create-with-same-condition concurrently + const deletePromise = client + .send( + new DeleteContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + id: ctxId, + }) + ) + .then(() => "deleted" as const) + .catch((e: any) => ({ error: "delete" as const, detail: e })); + + const createPromise = client + .send( + new CreateContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + request: { + context: { + [`concDim_${suffix}`]: `race_target_${suffix}`, + }, + override: { + [configKey]: "updated_via_race", + }, + description: "Raced update", + change_reason: "concurrency test", + }, + }) + ) + .then((res: any) => { + trackContext(res.id); + return { recreated: true, id: res.id } as const; + }) + .catch((e: any) => ({ error: "create" as const, detail: e })); + + const [deleteResult, createResult] = await Promise.all([ + deletePromise, + createPromise, + ]); + + // Verify consistency: try to fetch the context + let finalContext: any = null; + // The context could exist under the original or a new ID + const possibleIds = [ctxId]; + if ( + typeof createResult === "object" && + "id" in createResult && + createResult.id + ) { + possibleIds.push(createResult.id); + } + + for (const id of possibleIds) { + try { + finalContext = await client.send( + new GetContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + id, + }) + ); + break; + } catch (_) {} + } + + if (finalContext) { + // Context exists — its override should be internally consistent + expect(finalContext.override).toBeDefined(); + console.log( + ` context survived with override: ${JSON.stringify(finalContext.override)}` + ); + } else { + // Context was deleted — that's also a valid outcome + console.log(` context was deleted (delete won the race)`); + } + + // Either way, no 500 errors should have occurred — both + // results should be either success or a 4xx client error + for (const result of [deleteResult, createResult]) { + if (typeof result === "object" && "error" in result) { + const status = + result.detail?.$response?.statusCode ?? + result.detail?.$metadata?.httpStatusCode; + expect(status).not.toBe(500); + } + } + }); + }); + + // ───────────────────────────────────────────────────────────────────────── + // TEST 4: Multiple rounds of create-then-delete to stress the lock + // release path (guard Drop) + // + // If the advisory lock is NOT properly released after each request, + // subsequent requests will time out with a 500 error. + // ───────────────────────────────────────────────────────────────────────── + describe("Lock release validation", () => { + test("20 sequential create+delete cycles should all succeed (lock is released each time)", async () => { + const configKey = `concKey2_${suffix}`; + const CYCLES = 20; + + for (let i = 0; i < CYCLES; i++) { + const resp = await client.send( + new CreateContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + request: { + context: { + [`concDim_${suffix}`]: `release_test_${suffix}_${i}`, + }, + override: { + [configKey]: `cycle_${i}`, + }, + description: `Lock release test cycle ${i}`, + change_reason: "concurrency test", + }, + }) + ); + expect(resp.$metadata.httpStatusCode).toBe(200); + + await client.send( + new DeleteContextCommand({ + workspace_id: wsId(), + org_id: orgId(), + id: resp.id!, + }) + ); + } + // If we get here without a 500 timeout, the lock is released correctly + console.log(` ${CYCLES} create+delete cycles completed successfully`); + }); + }); +});