diff --git a/include/elio/coro/frame_allocator.hpp b/include/elio/coro/frame_allocator.hpp index 43fb44e..88177d1 100644 --- a/include/elio/coro/frame_allocator.hpp +++ b/include/elio/coro/frame_allocator.hpp @@ -8,6 +8,17 @@ #include #include +// Architecture-specific CPU pause/yield hint for tight spin loops. +// Reduces power consumption and allows the HT sibling to run. +#if defined(__x86_64__) || defined(__i386__) +# define ELIO_CPU_PAUSE() __builtin_ia32_pause() +#elif defined(__aarch64__) || defined(__arm__) +# define ELIO_CPU_PAUSE() __asm__ __volatile__("yield" ::: "memory") +#else +# include +# define ELIO_CPU_PAUSE() std::this_thread::yield() +#endif + namespace elio::coro { /// Thread-local free-list based frame allocator for small coroutine frames @@ -167,14 +178,19 @@ class frame_allocator { while (head && count < REMOTE_QUEUE_BATCH && free_count_ < POOL_SIZE) { block_header* next = head->next.load(std::memory_order_acquire); - // If next is null but tail points elsewhere, spin briefly - // (producer is in the middle of push) + // If next is null but tail points elsewhere, the producer is in the + // middle of push() (has done the tail exchange but not yet written + // prev->next). Spin briefly with a CPU pause hint. if (!next && remote_tail_.load(std::memory_order_acquire) != head) { - // Spin wait for producer to complete - for (int i = 0; i < 100 && !head->next.load(std::memory_order_acquire); ++i) { - // Brief spin + for (int i = 0; i < 16; ++i) { + ELIO_CPU_PAUSE(); + next = head->next.load(std::memory_order_acquire); + if (next) break; } - next = head->next.load(std::memory_order_acquire); + // If the link still isn't ready, stop without consuming 'head'. + // Consuming it would leave the queue in a broken state because + // the producer would later write through a recycled pointer. + if (!next) break; } pool_[free_count_++] = head; @@ -190,12 +206,16 @@ class frame_allocator { while (head) { block_header* next = head->next.load(std::memory_order_acquire); - // If next is null but tail points elsewhere, spin briefly + // Same safe spin pattern as reclaim_remote_returns(), but with more + // retries because we're in teardown and really want to drain the queue. if (!next && remote_tail_.load(std::memory_order_acquire) != head) { - for (int i = 0; i < 1000 && !head->next.load(std::memory_order_acquire); ++i) { - // Brief spin + for (int i = 0; i < 32; ++i) { + ELIO_CPU_PAUSE(); + next = head->next.load(std::memory_order_acquire); + if (next) break; } - next = head->next.load(std::memory_order_acquire); + // Stop safely rather than risk corrupting a partially-linked node. + if (!next) break; } if (free_count_ < POOL_SIZE) { diff --git a/include/elio/coro/task.hpp b/include/elio/coro/task.hpp index 332d4f7..212f593 100644 --- a/include/elio/coro/task.hpp +++ b/include/elio/coro/task.hpp @@ -52,7 +52,11 @@ template struct join_state { std::optional value_; std::exception_ptr exception_; - std::atomic waiter_{nullptr}; // Stores coroutine_handle address + // waiter_/completed_ are the hot path: written by the coroutine producer + // (complete()) and the awaiting consumer (set_waiter()). Aligning them + // to a new cache line separates them from value_/exception_ which may + // be large and written only once, preventing false sharing. + alignas(64) std::atomic waiter_{nullptr}; // Stores coroutine_handle address std::atomic completed_{false}; void set_value(T&& value) { @@ -109,7 +113,8 @@ struct join_state { template<> struct join_state { std::exception_ptr exception_; - std::atomic waiter_{nullptr}; + // Hot atomics on their own cache line (see join_state comment above) + alignas(64) std::atomic waiter_{nullptr}; std::atomic completed_{false}; void set_value() { diff --git a/include/elio/io/io_uring_backend.hpp b/include/elio/io/io_uring_backend.hpp index 2e4fff4..95f4e63 100644 --- a/include/elio/io/io_uring_backend.hpp +++ b/include/elio/io/io_uring_backend.hpp @@ -17,12 +17,15 @@ #include #include #include +#include +#include namespace elio::io { -/// Number of shards for resume tracking map to reduce lock contention -/// Using a power of 2 allows fast modulo via bitwise AND -static constexpr size_t kResumeTrackingShards = 16; +/// Number of shards for resume tracking map to reduce lock contention. +/// 64 shards accommodate up to ~64-core machines without measurable contention; +/// must be a power of 2 for efficient bitwise modulo. +static constexpr size_t kResumeTrackingShards = 64; /// Get the shard index for a given user_data pointer inline size_t get_resume_tracking_shard(const void* user_data) { @@ -57,7 +60,15 @@ class io_uring_backend : public io_backend { public: /// Configuration options struct config { - uint32_t queue_depth = 256; ///< SQ/CQ depth + /// SQ/CQ depth: defaults to 512 * num_hw_threads, clamped to [1024, 32768]. + /// A larger ring reduces SQE exhaustion under high concurrency without + /// wasting memory on small machines. + uint32_t queue_depth = []() -> uint32_t { + unsigned hw = std::thread::hardware_concurrency(); + if (hw == 0) hw = 4; // conservative default if detection fails + auto d = static_cast(hw) * 512u; + return std::clamp(d, 1024u, 32768u); + }(); uint32_t flags = 0; ///< io_uring_setup flags bool sq_poll = false; ///< Enable SQ polling (requires privileges) }; diff --git a/include/elio/runtime/chase_lev_deque.hpp b/include/elio/runtime/chase_lev_deque.hpp index cea4f75..e28c40c 100644 --- a/include/elio/runtime/chase_lev_deque.hpp +++ b/include/elio/runtime/chase_lev_deque.hpp @@ -199,24 +199,46 @@ class chase_lev_deque { return nullptr; } - /// Steal multiple elements at once (thieves only) - lock-free - /// WARNING: This function has a race condition with pop() when the owner - /// pops items between when we read bottom and when we load items. - /// Use steal() for single-item stealing which is properly synchronized. - /// This function is kept for reference but should not be used. + /// Steal up to N elements atomically (thieves only) - lock-free + /// + /// Uses a single CAS on top_ to claim min(size/2, N, 1) slots at once, + /// which dramatically reduces CAS overhead compared to N individual steal() + /// calls when many items are available. + /// + /// Safety: the CAS atomically reserves the range [t, t+batch); the owner + /// pops from the bottom (indices ≥ new bottom_), so there is no overlap + /// as long as batch ≤ available = b − t, which is enforced below. + /// + /// Returns the number of items stored in output[0..N-1]. template - [[deprecated("Use steal() instead - steal_batch has race conditions with pop()")]] size_t steal_batch(std::array& output) noexcept { - size_t stolen = 0; - - while (stolen < N) { - // Single-item steal to avoid race with owner's pop - T* item = steal(); - if (!item) break; - output[stolen++] = item; + size_t t = top_.load(std::memory_order_acquire); + // seq_cst fence: required for correctness with pop() (same as steal()) + std::atomic_thread_fence(std::memory_order_seq_cst); + size_t b = bottom_.load(std::memory_order_acquire); + + if (t >= b) return 0; // empty + + size_t available = b - t; + // Steal at most half the queue (work-stealing heuristic), at least 1 + size_t batch = std::min(std::max(available / 2, size_t{1}), N); + + // Atomically claim [t, t+batch) by advancing top_ + if (!top_.compare_exchange_strong(t, t + batch, + std::memory_order_acq_rel, std::memory_order_relaxed)) { + return 0; // Lost race with another thief or owner } - - return stolen; + + // We exclusively own slots [t, t+batch). Load items from the buffer. + // buffer_.load(acquire) ensures we see any resize that happened before + // our CAS; old buffers are kept alive in old_buffers_ so this is safe + // even if a resize happened concurrently. + circular_buffer* buf = buffer_.load(std::memory_order_acquire); + for (size_t i = 0; i < batch; ++i) { + output[i] = buf->load(t + i); + } + + return batch; } [[nodiscard]] size_t size() const noexcept { diff --git a/include/elio/runtime/scheduler.hpp b/include/elio/runtime/scheduler.hpp index cb462e8..b060c69 100644 --- a/include/elio/runtime/scheduler.hpp +++ b/include/elio/runtime/scheduler.hpp @@ -261,11 +261,20 @@ class scheduler { } std::vector> workers_; - std::atomic num_threads_; + + // Frequently-read fields on their own cache line to avoid false sharing + // with the spawn counter and the slow-path workers_mutex_. + alignas(64) std::atomic num_threads_; std::atomic running_; std::atomic paused_; - std::atomic spawn_index_; - mutable std::mutex workers_mutex_; + + // spawn_index_ is incremented on every spawn(); isolate it so modifications + // don't invalidate the num_threads_/running_ cache line on other cores. + alignas(64) std::atomic spawn_index_; + + // workers_mutex_ is only touched on slow-path resize operations; + // keep it away from the hot read fields above. + alignas(64) mutable std::mutex workers_mutex_; wait_strategy wait_strategy_; static inline thread_local scheduler* current_scheduler_ = nullptr; diff --git a/include/elio/sync/primitives.hpp b/include/elio/sync/primitives.hpp index df50675..d65f6ac 100644 --- a/include/elio/sync/primitives.hpp +++ b/include/elio/sync/primitives.hpp @@ -26,8 +26,16 @@ namespace elio::sync { /// Coroutine-aware mutex /// Unlike std::mutex, this suspends the coroutine instead of blocking the thread /// -/// Optimized with atomic fast path for try_lock - avoids mutex acquisition -/// in the uncontended case for ~10x performance improvement. +/// Lock-free implementation using an intrusive LIFO waiter stack. +/// A single atomic pointer encodes the full lock state: +/// nullptr — unlocked +/// (void*)this — locked, no waiters (LOCKED_NO_WAITERS sentinel) +/// — locked, head of intrusive LIFO waiter stack +/// +/// The uncontended fast path is a single CAS (~3 cycles) with no heap +/// allocation and no OS mutex. On contention, waiters chain themselves +/// into a lock-free LIFO stack; unlock pops the head and re-schedules it +/// via the coroutine scheduler. class mutex { public: mutex() = default; @@ -39,84 +47,114 @@ class mutex { mutex(mutex&&) = delete; mutex& operator=(mutex&&) = delete; - /// Lock awaitable + /// Lock awaitable — lives in the coroutine frame for the duration of a + /// co_await m.lock() expression, so it is safe to store 'this' in the + /// mutex's intrusive waiter list. class lock_awaitable { public: - explicit lock_awaitable(mutex& m) : mutex_(m) {} + explicit lock_awaitable(mutex& m) noexcept : mutex_(m) {} bool await_ready() const noexcept { - // Try to acquire without waiting using atomic fast path return mutex_.try_lock(); } - bool await_suspend(std::coroutine_handle<> awaiter) noexcept { - std::lock_guard guard(mutex_.internal_mutex_); - - // Double-check after acquiring internal lock - // Use relaxed here since we hold the mutex - if (!mutex_.locked_.load(std::memory_order_relaxed)) { - mutex_.locked_.store(true, std::memory_order_relaxed); - return false; // Don't suspend, we got the lock + /// Either acquires the lock inline (returns false = do not suspend) or + /// pushes this awaitable onto the mutex's LIFO waiter stack and returns + /// true (suspend). Loops until one of these two outcomes is achieved + /// via lock-free CAS. + bool await_suspend(std::coroutine_handle<> h) noexcept { + handle_ = h; + void* old_state = mutex_.state_.load(std::memory_order_acquire); + while (true) { + if (old_state == nullptr) { + // Unlocked — try to acquire inline + if (mutex_.state_.compare_exchange_weak( + old_state, mutex_.locked_no_waiters(), + std::memory_order_acquire, + std::memory_order_relaxed)) { + return false; // acquired, do not suspend + } + // CAS failed, old_state refreshed — retry + } else { + // Locked — push this awaitable onto the LIFO stack + next_ = (old_state == mutex_.locked_no_waiters()) + ? nullptr + : static_cast(old_state); + if (mutex_.state_.compare_exchange_weak( + old_state, this, + std::memory_order_release, + std::memory_order_relaxed)) { + return true; // enqueued, suspend + } + // CAS failed, old_state refreshed — retry + } } - - // Add to wait queue - mutex_.waiters_.push(awaiter); - return true; // Suspend } void await_resume() const noexcept {} private: + friend class mutex; mutex& mutex_; + lock_awaitable* next_{nullptr}; // intrusive LIFO linkage + std::coroutine_handle<> handle_; // handle to resume on unlock }; /// Acquire the mutex - auto lock() { - return lock_awaitable(*this); - } + [[nodiscard]] auto lock() noexcept { return lock_awaitable(*this); } - /// Try to acquire the mutex without waiting - /// Lock-free fast path using atomic CAS - no mutex acquisition needed + /// Try to acquire the mutex without waiting (lock-free, single CAS) bool try_lock() noexcept { - bool expected = false; - return locked_.compare_exchange_strong(expected, true, - std::memory_order_acquire, std::memory_order_relaxed); + void* expected = nullptr; + return state_.compare_exchange_strong( + expected, locked_no_waiters(), + std::memory_order_acquire, + std::memory_order_relaxed); } /// Release the mutex - void unlock() { - std::coroutine_handle<> to_resume; - - { - std::lock_guard guard(internal_mutex_); - - if (!waiters_.empty()) { - // Wake up next waiter - to_resume = waiters_.front(); - waiters_.pop(); - // Lock remains held by the woken coroutine (locked_ stays true) - } else { - // No waiters - release the lock - locked_.store(false, std::memory_order_release); + void unlock() noexcept { + void* state = state_.load(std::memory_order_relaxed); + + if (state == locked_no_waiters()) { + // Fast path: no waiters — just release + if (state_.compare_exchange_strong( + state, nullptr, + std::memory_order_release, + std::memory_order_relaxed)) { + return; } + // A waiter pushed itself between our load and CAS; reload + state = state_.load(std::memory_order_acquire); } - // Re-schedule the waiter through the scheduler instead of resuming directly - // This avoids deep recursion and ownership confusion - if (to_resume) { - runtime::schedule_handle(to_resume); - } + // Pop head waiter and transfer lock ownership to it (LIFO) + auto* head = static_cast(state); + void* next_state = (head->next_ == nullptr) + ? locked_no_waiters() + : static_cast(head->next_); + state_.store(next_state, std::memory_order_release); + + // Schedule the waiter — it now holds the lock + runtime::schedule_handle(head->handle_); } /// Check if mutex is currently locked bool is_locked() const noexcept { - return locked_.load(std::memory_order_acquire); + return state_.load(std::memory_order_acquire) != nullptr; } private: - mutable std::mutex internal_mutex_; - std::atomic locked_{false}; - std::queue> waiters_; + /// Sentinel value meaning "locked but no waiters". + /// Uses the mutex's own address — guaranteed to differ from any + /// lock_awaitable* (awaitables live in coroutine frames, not inside mutexes). + void* locked_no_waiters() const noexcept { + return const_cast(static_cast(this)); + } + + // Single atomic encodes the full state (see class-level comment). + // No separate std::mutex or std::queue needed. + std::atomic state_{nullptr}; }; /// RAII lock guard for coroutine mutex @@ -379,8 +417,13 @@ class shared_mutex { } private: - mutable std::mutex internal_mutex_; - std::atomic state_{0}; // Packed: [writer_waiting:1][writer_active:1][readers:62] + // state_ is the hot field: read on every lock_shared() fast path, + // and written on every reader acquire/release. Keeping it isolated + // avoids false sharing with the slow-path internal_mutex_. + alignas(64) std::atomic state_{0}; // Packed: [writer_waiting:1][writer_active:1][readers:62] + + // slow-path fields: only accessed under internal_mutex_ + alignas(64) mutable std::mutex internal_mutex_; size_t pending_writers_ = 0; // Count of pending writers (for WRITER_WAITING flag management) std::queue> reader_waiters_; std::queue> writer_waiters_;