Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/service_utils/src/middlewares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod auth_n;
pub mod auth_z;
pub mod request_response_logging;
pub mod workspace_context;
pub mod workspace_lock;
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says the workspace lock middleware is registered on workspace-scoped endpoints, but there are currently no references/usages of WorkspaceLockMiddlewareFactory anywhere outside its own module (search across crates/**.rs). As-is, exporting the module won’t activate locking. The middleware needs to be wired into the Actix scopes (and ordered appropriately relative to OrgWorkspaceMiddlewareFactory).

Copilot uses AI. Check for mistakes.
363 changes: 363 additions & 0 deletions crates/service_utils/src/middlewares/workspace_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
use std::future::{Ready, ready};
use std::rc::Rc;

use actix_web::{
Error,
body::EitherBody,
dev::{Service, ServiceRequest, ServiceResponse, Transform, forward_ready},
error,
http::Method,
web::Data,
};
use diesel::prelude::*;
use futures_util::future::LocalBoxFuture;
use superposition_types::DBConnection;

use crate::{extensions::HttpRequestExt, service::types::AppState};

/// Middleware factory for workspace locking using PostgreSQL advisory locks.
/// This ensures all write operations (POST, PUT, DELETE, PATCH) are serialized per workspace.
pub struct WorkspaceLockMiddlewareFactory;

impl WorkspaceLockMiddlewareFactory {
pub fn new() -> Self {
Self
}
}

impl Default for WorkspaceLockMiddlewareFactory {
fn default() -> Self {
Self::new()
}
}

impl<S, B> Transform<S, ServiceRequest> for WorkspaceLockMiddlewareFactory
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<EitherBody<B>>;
type Error = Error;
type InitError = ();
type Transform = WorkspaceLockMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;

fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(WorkspaceLockMiddleware {
service: Rc::new(service),
}))
}
}

pub struct WorkspaceLockMiddleware<S> {
service: Rc<S>,
}

impl<S, B> Service<ServiceRequest> for WorkspaceLockMiddleware<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<EitherBody<B>>;
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

forward_ready!(service);

fn call(&self, req: ServiceRequest) -> Self::Future {
let srv = self.service.clone();

Box::pin(async move {
// Only lock for write operations
let is_write_operation = match *req.method() {
Method::PUT | Method::DELETE | Method::PATCH => true,
Method::POST if is_read_only_post(req.path()) => false,
Method::POST => true,
_ => false,
};

if !is_write_operation {
// For read operations, skip locking and proceed directly
let res = srv.call(req).await?.map_into_left_body();
return Ok(res);
}

// Extract workspace and org IDs
let workspace_id = req.request().get_workspace_id();
let org_id = req.request().get_organisation_id();

// If we don't have both IDs, we can't lock - proceed without locking
// The OrgWorkspaceMiddleware will handle the validation
let lock_key = match (org_id, workspace_id) {
(Some(org), Some(workspace)) => {
Some(compute_lock_key(&org.0, &workspace.0))
}
_ => None,
};

// Get database connection from app state
let app_state = match req.app_data::<Data<AppState>>() {
Some(val) => val,
None => {
log::error!("app state not set in workspace lock middleware");
return Err(error::ErrorInternalServerError(""));
}
};

// Acquire advisory lock if we have a lock key.
// The guard takes ownership of a dedicated pooled connection so that
// no `&mut` borrow is held across the `.await` of the inner service.
let _lock_guard = if let Some(lock_key) = lock_key {
let db_conn = match app_state.db_pool.get() {
Ok(conn) => conn,
Err(e) => {
log::error!(
"failed to get database connection for workspace lock: {}",
e
);
return Err(error::ErrorInternalServerError(
"Failed to acquire database connection",
));
}
};
match acquire_advisory_lock(db_conn, lock_key).await {
Ok(guard) => {
log::debug!(
"acquired advisory lock for workspace (lock_key: {})",
lock_key
);
Some(guard)
}
Err(e) => {
log::error!(
"failed to acquire advisory lock for lock_key: {}: {}",
lock_key,
e
);
return Err(error::ErrorInternalServerError(
"Failed to acquire workspace lock",
));
}
}
} else {
None
};

// Call the actual handler
// The lock guard will automatically release the lock when dropped,
// even if the handler panics
let result = srv.call(req).await;

// Guard is dropped here, ensuring lock is always released
result.map(|r| r.map_into_left_body())
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

#[inline]
fn is_read_only_post(path: &str) -> bool {
// Match "resolve" as a full path segment — never a bare substring.
// Handles: "/resolve", "/api/resolve", "/resolve/flags", "/v1/resolve/overrides"
// Rejects: "/resolveconfig", "/myresolve"
path.split('/').any(|segment| segment == "resolve")
}

/// Compute a stable, deterministic 64-bit advisory lock key from org_id and workspace_id.
///
/// Uses FNV-1a (Fowler–Noll–Vo) which is:
/// - Fully deterministic across all Rust versions and platforms
/// - Free of random seeding (unlike `DefaultHasher` / `SipHash`)
/// - Suitable for use as a stable identifier in distributed deployments
///
/// The key is derived from the combined string "<org_id>:<workspace_id>" so that
/// different (org, workspace) pairs always produce different keys.
fn compute_lock_key(org_id: &str, workspace_id: &str) -> i64 {
// FNV-1a constants for 64-bit variant
const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
const FNV_PRIME: u64 = 1099511628211;

let mut hash = FNV_OFFSET_BASIS;
// Combine the two IDs with a separator that cannot appear in either ID
for byte in org_id
.bytes()
.chain(b":".iter().copied())
.chain(workspace_id.bytes())
{
hash ^= byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
// Reinterpret the u64 bits as i64 for PostgreSQL's bigint parameter
hash as i64
}

/// Error type returned by [`acquire_advisory_lock`].
///
/// Separating "the SQL call failed" from "we gave up after too many retries"
/// avoids synthesising a fake `diesel::result::Error::DatabaseError`.
#[derive(Debug)]
enum AcquireLockError {
Diesel(diesel::result::Error),
MaxRetriesExceeded(u32),
}

impl std::fmt::Display for AcquireLockError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Diesel(e) => write!(f, "database error: {}", e),
Self::MaxRetriesExceeded(attempts) => write!(
f,
"failed to acquire workspace lock after {} attempts (high contention)",
attempts
),
}
}
}

impl From<diesel::result::Error> for AcquireLockError {
fn from(e: diesel::result::Error) -> Self {
Self::Diesel(e)
}
}

/// Acquire a PostgreSQL session-level advisory lock using the single-bigint form.
///
/// Takes **ownership** of the pooled connection so that the returned
/// [`AdvisoryLockGuard`] can live across `.await` points without holding a
/// `&mut` borrow (which would create a self-referential future).
///
/// Uses `pg_try_advisory_lock(bigint)` (non-blocking) with exponential backoff.
async fn acquire_advisory_lock(
mut conn: DBConnection,
lock_key: i64,
) -> Result<AdvisoryLockGuard, AcquireLockError> {
const MAX_RETRIES: u32 = 15;
const INITIAL_BACKOFF_MS: u64 = 10;
const MAX_BACKOFF_MS: u64 = 1000;

for attempt in 0..MAX_RETRIES {
// Try to acquire the lock (non-blocking)
let lock_acquired: bool =
diesel::sql_query("SELECT pg_try_advisory_lock($1) as locked")
.bind::<diesel::sql_types::BigInt, _>(lock_key)
.get_result::<LockResult>(&mut *conn)?
.locked;

if lock_acquired {
if attempt > 0 {
log::info!(
"acquired advisory lock after {} attempts (lock_key: {})",
attempt + 1,
lock_key
);
}
return Ok(AdvisoryLockGuard { conn, lock_key });
}

// Lock not acquired, wait before retrying
if attempt < MAX_RETRIES - 1 {
let backoff_ms =
std::cmp::min(INITIAL_BACKOFF_MS * 2_u64.pow(attempt), MAX_BACKOFF_MS);
log::debug!(
"lock contention detected, retrying in {}ms (attempt {}/{}, lock_key: {})",
backoff_ms,
attempt + 1,
MAX_RETRIES,
lock_key
);
actix_web::rt::time::sleep(std::time::Duration::from_millis(backoff_ms))
.await;
}
}

Err(AcquireLockError::MaxRetriesExceeded(MAX_RETRIES))
}

// Helper struct for deserializing pg_try_advisory_lock result
#[derive(QueryableByName)]
struct LockResult {
#[diesel(sql_type = diesel::sql_types::Bool)]
locked: bool,
}

/// RAII guard that ensures the advisory lock is released when dropped.
///
/// **Owns** the pooled `DBConnection` so that:
/// 1. No `&mut` borrow is held across `.await` in the calling async block.
/// 2. If the guard is dropped (including during panic unwind), the unlock
/// query runs on the same session that acquired the lock.
/// 3. Even if the unlock query fails, dropping the owned connection returns
/// it to the pool, and the session-level lock is released when PostgreSQL
/// recycles the backend.
struct AdvisoryLockGuard {
conn: DBConnection,
lock_key: i64,
}

impl Drop for AdvisoryLockGuard {
fn drop(&mut self) {
let result = diesel::sql_query("SELECT pg_advisory_unlock($1)")
.bind::<diesel::sql_types::BigInt, _>(self.lock_key)
.execute(&mut *self.conn);

match result {
Ok(_) => log::debug!(
"released advisory lock via guard (lock_key: {})",
self.lock_key
),
Err(e) => log::error!(
"failed to release advisory lock in drop guard (lock_key: {}): {}",
self.lock_key,
e
),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_compute_lock_key_consistency() {
let key1 = compute_lock_key("org1", "workspace1");
let key2 = compute_lock_key("org1", "workspace1");
assert_eq!(
key1, key2,
"Same inputs must always produce the same lock key"
);
}

#[test]
fn test_compute_lock_key_uniqueness() {
let key1 = compute_lock_key("org1", "workspace1");
let key2 = compute_lock_key("org1", "workspace2");
let key3 = compute_lock_key("org2", "workspace1");
let key4 = compute_lock_key("org2", "workspace2");

assert_ne!(
key1, key2,
"Different workspaces should produce different keys"
);
assert_ne!(key1, key3, "Different orgs should produce different keys");
assert_ne!(
key1, key4,
"Different combinations should produce different keys"
);
assert_ne!(key2, key3, "Different org/workspace combos should differ");
assert_ne!(key3, key4, "Different org/workspace combos should differ");
}

#[test]
fn test_compute_lock_key_separator_prevents_ambiguity() {
// "org1:" + "workspace" must differ from "org" + ":1workspace"
// i.e., swapping characters across the conceptual boundary must not collide
let key1 = compute_lock_key("org1", "workspace");
let key2 = compute_lock_key("org", "1workspace");
assert_ne!(
key1, key2,
"Keys derived from ambiguous splits of the same byte sequence must differ"
);
}
}
Loading
Loading