From e4253a3c60c2b2735aee23d775d3ceb2f3bee780 Mon Sep 17 00:00:00 2001 From: Coldwings Date: Tue, 10 Mar 2026 16:57:04 +0800 Subject: [PATCH] performance boost --- include/elio/coro/cancel_token.hpp | 10 ++-- include/elio/coro/frame_allocator.hpp | 13 ++++-- include/elio/coro/task_handle.hpp | 20 ++++++++ include/elio/io/io_uring_backend.hpp | 43 ++++++++++++----- include/elio/runtime/mpsc_queue.hpp | 2 +- include/elio/runtime/scheduler.hpp | 26 +++++------ include/elio/runtime/worker_thread.hpp | 64 +++++++++++++++++++++----- include/elio/sync/primitives.hpp | 2 +- 8 files changed, 133 insertions(+), 47 deletions(-) diff --git a/include/elio/coro/cancel_token.hpp b/include/elio/coro/cancel_token.hpp index 308fb93..1dfef9f 100644 --- a/include/elio/coro/cancel_token.hpp +++ b/include/elio/coro/cancel_token.hpp @@ -26,13 +26,13 @@ struct cancel_state { uint64_t next_id = 1; uint64_t add_callback(std::function cb) { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); if (cancelled.load(std::memory_order_relaxed)) { - // Already cancelled, invoke immediately outside lock - // Need to release lock first - mutex.unlock(); + // Already cancelled, invoke callback outside lock + // Release the unique_lock to avoid double-unlock UB + lock.release(); cb(); - mutex.lock(); + // Don't re-acquire - we're done return 0; } uint64_t id = next_id++; diff --git a/include/elio/coro/frame_allocator.hpp b/include/elio/coro/frame_allocator.hpp index 204ba8d..43fb44e 100644 --- a/include/elio/coro/frame_allocator.hpp +++ b/include/elio/coro/frame_allocator.hpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace elio::coro { @@ -216,6 +217,10 @@ class frame_allocator { // Pool registry for cross-thread access static constexpr size_t MAX_POOLS = 256; + // Registry entries - atomic for lock-free reads, protected by mutex for writes + static inline std::atomic pool_registry_[MAX_POOLS]{}; + static inline std::mutex registry_mutex_; // Protects unregister operations + static void register_pool(frame_allocator* pool) noexcept { uint32_t id = pool->pool_id_; if (id < MAX_POOLS) { @@ -226,10 +231,15 @@ class frame_allocator { static void unregister_pool(frame_allocator* pool) noexcept { uint32_t id = pool->pool_id_; if (id < MAX_POOLS) { + // Use mutex to ensure no concurrent lookups during unregister + // This prevents the race where a lookup sees a valid pointer + // but the pool is being destroyed + std::lock_guard lock(registry_mutex_); pool_registry_[id].store(nullptr, std::memory_order_release); } } + // Get pool by ID - returns nullptr if pool was unregistered static frame_allocator* get_pool_by_id(uint32_t id) noexcept { if (id < MAX_POOLS) { return pool_registry_[id].load(std::memory_order_acquire); @@ -247,9 +257,6 @@ class frame_allocator { // Global pool ID counter static inline std::atomic next_pool_id_{0}; - - // Global pool registry for cross-thread lookups - static inline std::array, MAX_POOLS> pool_registry_{}; }; } // namespace elio::coro diff --git a/include/elio/coro/task_handle.hpp b/include/elio/coro/task_handle.hpp index b6b7607..33743ce 100644 --- a/include/elio/coro/task_handle.hpp +++ b/include/elio/coro/task_handle.hpp @@ -124,9 +124,18 @@ struct task_state { } bool set_waiter(std::coroutine_handle<> h) noexcept { + // Fast path: check if already done before trying to set waiter + // This avoids the race of setting a waiter on an already-completed task + if (is_done()) { + return false; + } + + // Try to atomically set the waiter void* expected = nullptr; if (waiter_.compare_exchange_strong(expected, h.address(), std::memory_order_release, std::memory_order_acquire)) { + // Double-check if task completed between our initial check and CAS + // If so, we need to notify the waiter we just set if (is_done()) { void* addr = waiter_.exchange(nullptr, std::memory_order_acq_rel); if (addr) { @@ -136,6 +145,7 @@ struct task_state { } return true; } + // Another waiter already set (shouldn't happen with single await) return false; } @@ -201,9 +211,18 @@ struct task_state { } bool set_waiter(std::coroutine_handle<> h) noexcept { + // Fast path: check if already done before trying to set waiter + // This avoids the race of setting a waiter on an already-completed task + if (is_done()) { + return false; + } + + // Try to atomically set the waiter void* expected = nullptr; if (waiter_.compare_exchange_strong(expected, h.address(), std::memory_order_release, std::memory_order_acquire)) { + // Double-check if task completed between our initial check and CAS + // If so, we need to notify the waiter we just set if (is_done()) { void* addr = waiter_.exchange(nullptr, std::memory_order_acq_rel); if (addr) { @@ -213,6 +232,7 @@ struct task_state { } return true; } + // Another waiter already set (shouldn't happen with single await) return false; } diff --git a/include/elio/io/io_uring_backend.hpp b/include/elio/io/io_uring_backend.hpp index 4e5cc64..2e4fff4 100644 --- a/include/elio/io/io_uring_backend.hpp +++ b/include/elio/io/io_uring_backend.hpp @@ -20,17 +20,30 @@ namespace elio::io { -/// Map to track handles being resumed for thread-safe check-and-resume -/// Uses a mutex for simplicity - a handle address is mapped to a bool flag -/// indicating if the resume is in progress -inline std::unordered_map>& get_resume_tracking_map() { - static std::unordered_map> map; - return map; +/// Number of shards for resume tracking map to reduce lock contention +/// Using a power of 2 allows fast modulo via bitwise AND +static constexpr size_t kResumeTrackingShards = 16; + +/// Get the shard index for a given user_data pointer +inline size_t get_resume_tracking_shard(const void* user_data) { + // Use pointer value to distribute across shards + // Mix bits to get better distribution + uintptr_t addr = reinterpret_cast(user_data); + return (addr ^ (addr >> 8) ^ (addr >> 16)) & (kResumeTrackingShards - 1); } -inline std::mutex& get_resume_tracking_mutex() { - static std::mutex mutex; - return mutex; +/// Per-shard tracking data +struct resume_tracking_shard { + std::unordered_map> map; + std::mutex mutex; +}; + +/// Get the sharded resume tracking maps and mutexes +/// Using sharding to reduce lock contention under high I/O load +inline std::array& +get_resume_tracking_shards() { + static std::array shards; + return shards; } /// io_uring backend implementation @@ -410,11 +423,17 @@ class io_uring_backend : public io_backend { auto handle = std::coroutine_handle<>::from_address(user_data); if (handle) { - // Thread-safe check-and-resume using atomic operations. + // Thread-safe check-and-resume using sharded atomic operations. + // Uses per-shard locking to reduce lock contention under high I/O load. // Try to atomically claim this resume to prevent use-after-free. // If another thread is already resuming this handle, skip. - auto& tracking_map = get_resume_tracking_map(); - std::lock_guard lock(get_resume_tracking_mutex()); + + // Get the shard for this user_data to reduce contention + size_t shard = get_resume_tracking_shard(user_data); + auto& shards = get_resume_tracking_shards(); + + std::lock_guard lock(shards[shard].mutex); + auto& tracking_map = shards[shard].map; // Get or create the atomic flag for this handle auto it = tracking_map.find(user_data); diff --git a/include/elio/runtime/mpsc_queue.hpp b/include/elio/runtime/mpsc_queue.hpp index c212245..099601d 100644 --- a/include/elio/runtime/mpsc_queue.hpp +++ b/include/elio/runtime/mpsc_queue.hpp @@ -39,7 +39,7 @@ class mpsc_queue { mpsc_queue& operator=(const mpsc_queue&) = delete; mpsc_queue(mpsc_queue&&) = delete; mpsc_queue& operator=(mpsc_queue&&) = delete; - + /// Push an item (multiple producers allowed) - lock-free, wait-free for non-full queue bool push(T* item) noexcept { size_t pos = tail_.load(std::memory_order_relaxed); diff --git a/include/elio/runtime/scheduler.hpp b/include/elio/runtime/scheduler.hpp index e275ea0..cb462e8 100644 --- a/include/elio/runtime/scheduler.hpp +++ b/include/elio/runtime/scheduler.hpp @@ -309,11 +309,11 @@ inline void worker_thread::stop() { inline void worker_thread::drain_remaining_tasks() noexcept { // First drain inbox to deque void* addr; - while ((addr = inbox_.pop()) != nullptr) { - queue_.push(addr); + while ((addr = inbox_->pop()) != nullptr) { + queue_->push(addr); } // Then destroy all tasks in the deque - while ((addr = queue_.pop()) != nullptr) { + while ((addr = queue_->pop()) != nullptr) { auto handle = std::coroutine_handle<>::from_address(addr); if (handle) { handle.destroy(); @@ -325,11 +325,11 @@ inline void worker_thread::drain_remaining_tasks() noexcept { inline void worker_thread::redistribute_tasks(scheduler* sched) noexcept { // First drain inbox to deque void* addr; - while ((addr = inbox_.pop()) != nullptr) { - queue_.push(addr); + while ((addr = inbox_->pop()) != nullptr) { + queue_->push(addr); } // Then redistribute all tasks to active workers - while ((addr = queue_.pop()) != nullptr) { + while ((addr = queue_->pop()) != nullptr) { auto handle = std::coroutine_handle<>::from_address(addr); if (handle && !handle.done()) { // Respawn to an active worker @@ -344,8 +344,8 @@ inline void worker_thread::drain_inbox() noexcept { // Drain MPSC inbox into local Chase-Lev deque // Drain all available items to ensure tasks aren't stuck in inbox void* item; - while ((item = inbox_.pop()) != nullptr) { - queue_.push(item); + while ((item = inbox_->pop()) != nullptr) { + queue_->push(item); } } @@ -388,17 +388,17 @@ inline std::coroutine_handle<> worker_thread::get_next_task() noexcept { // In single-worker mode, use pop_local() to skip seq_cst fence // Pass !single_worker as allow_concurrent_steals - pop_local() will fall back // to pop() if there could be concurrent stealers - void* addr = queue_.pop_local(!single_worker); + void* addr = queue_->pop_local(!single_worker); if (addr) { needs_sync_ = false; // Local task, no sync needed return std::coroutine_handle<>::from_address(addr); } - + // Local queue empty - drain any externally submitted tasks from inbox drain_inbox(); - + // Try local deque again after draining inbox - addr = queue_.pop_local(!single_worker); + addr = queue_->pop_local(!single_worker); if (addr) { needs_sync_ = true; // Came from inbox, needs sync return std::coroutine_handle<>::from_address(addr); @@ -500,7 +500,7 @@ inline void worker_thread::poll_io_when_idle() { // Optional spinning phase (if configured via wait_strategy) if (strategy_.spin_iterations > 0) { for (size_t i = 0; i < strategy_.spin_iterations; ++i) { - if (inbox_.size_approx() > 0) { + if (inbox_->size_approx() > 0) { idle_.store(false, std::memory_order_relaxed); return; } diff --git a/include/elio/runtime/worker_thread.hpp b/include/elio/runtime/worker_thread.hpp index 54a9c8c..6d30aaa 100644 --- a/include/elio/runtime/worker_thread.hpp +++ b/include/elio/runtime/worker_thread.hpp @@ -31,8 +31,8 @@ class worker_thread { wait_strategy strategy = wait_strategy::blocking()) : scheduler_(sched) , worker_id_(worker_id) - , queue_() - , inbox_() + , queue_(std::make_unique>()) + , inbox_(std::make_unique>()) , running_(false) , tasks_executed_(0) , strategy_(strategy) @@ -45,8 +45,48 @@ class worker_thread { worker_thread(const worker_thread&) = delete; worker_thread& operator=(const worker_thread&) = delete; - worker_thread(worker_thread&&) = delete; - worker_thread& operator=(worker_thread&&) = delete; + + worker_thread(worker_thread&& other) noexcept + : scheduler_(other.scheduler_) + , worker_id_(other.worker_id_) + , queue_(std::move(other.queue_)) + , inbox_(std::move(other.inbox_)) + , thread_(std::move(other.thread_)) + , running_(other.running_.load(std::memory_order_relaxed)) + , tasks_executed_(other.tasks_executed_.load(std::memory_order_relaxed)) + , idle_(other.idle_.load(std::memory_order_relaxed)) + , last_task_time_(other.last_task_time_.load(std::memory_order_relaxed)) + , needs_sync_(other.needs_sync_) + , strategy_(std::move(other.strategy_)) + , io_context_(std::move(other.io_context_)) { + // Reset other's pointers to empty states to prevent double-free + other.queue_ = std::make_unique>(); + other.inbox_ = std::make_unique>(); + } + + worker_thread& operator=(worker_thread&& other) noexcept { + if (this != &other) { + stop(); + + scheduler_ = other.scheduler_; + worker_id_ = other.worker_id_; + queue_ = std::move(other.queue_); + inbox_ = std::move(other.inbox_); + thread_ = std::move(other.thread_); + running_.store(other.running_.load(std::memory_order_relaxed), std::memory_order_relaxed); + tasks_executed_.store(other.tasks_executed_.load(std::memory_order_relaxed), std::memory_order_relaxed); + idle_.store(other.idle_.load(std::memory_order_relaxed), std::memory_order_relaxed); + last_task_time_.store(other.last_task_time_.load(std::memory_order_relaxed), std::memory_order_relaxed); + needs_sync_ = other.needs_sync_; + strategy_ = std::move(other.strategy_); + io_context_ = std::move(other.io_context_); + + // Reset other's pointers to empty states to prevent double-free + other.queue_ = std::make_unique>(); + other.inbox_ = std::make_unique>(); + } + return *this; + } void start(); void stop(); @@ -63,7 +103,7 @@ class worker_thread { if (!handle) [[unlikely]] return; // Try fast path: push to lock-free inbox - if (inbox_.push(handle.address())) [[likely]] { + if (inbox_->push(handle.address())) [[likely]] { // Lazy wake: only wake if worker is idle (waiting for work) // This avoids unnecessary wake syscalls when worker is busy // Use relaxed load - occasional extra wake is fine, we optimize for the common case @@ -84,7 +124,7 @@ class worker_thread { std::this_thread::yield(); #endif } - if (inbox_.push(handle.address())) { + if (inbox_->push(handle.address())) { // Lazy wake: only wake if worker is idle if (idle_.load(std::memory_order_relaxed)) { wake(); @@ -99,7 +139,7 @@ class worker_thread { /// Schedule a task from owner thread - pushes directly to local deque void schedule_local(std::coroutine_handle<> handle) { if (!handle) [[unlikely]] return; - queue_.push(handle.address()); + queue_->push(handle.address()); } [[nodiscard]] size_t tasks_executed() const noexcept { @@ -107,20 +147,20 @@ class worker_thread { } [[nodiscard]] size_t queue_size() const noexcept { - return queue_.size() + inbox_.size_approx(); + return queue_->size() + inbox_->size_approx(); } [[nodiscard]] std::coroutine_handle<> steal_task() noexcept { // Don't allow stealing from stopped workers if (!running_.load(std::memory_order_acquire)) return nullptr; - void* addr = queue_.steal(); + void* addr = queue_->steal(); return addr ? std::coroutine_handle<>::from_address(addr) : nullptr; } /// Steal multiple tasks at once for better throughput template size_t steal_batch(std::array& output) noexcept { - return queue_.steal_batch(output); + return queue_->steal_batch(output); } [[nodiscard]] bool is_running() const noexcept { @@ -182,8 +222,8 @@ class worker_thread { scheduler* scheduler_; size_t worker_id_; - chase_lev_deque queue_; // Owner's local deque (SPMC) - mpsc_queue inbox_; // External submissions (MPSC) + std::unique_ptr> queue_; // Owner's local deque (SPMC) + std::unique_ptr> inbox_; // External submissions (MPSC) std::thread thread_; std::atomic running_; std::atomic tasks_executed_; diff --git a/include/elio/sync/primitives.hpp b/include/elio/sync/primitives.hpp index 48edf96..df50675 100644 --- a/include/elio/sync/primitives.hpp +++ b/include/elio/sync/primitives.hpp @@ -672,7 +672,7 @@ class channel { other.closed_ = true; } - channel& operator=(channel&& other) noexcept { + channel& operator=(channel&& other) noexcept(false) { if (this != &other) { close(); capacity_ = other.capacity_;