Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ option(ELIO_ENABLE_HTTP "Enable HTTP client/server support (requires TLS)" ${ELI
option(ELIO_ENABLE_HTTP2 "Enable HTTP/2 support (requires nghttp2)" ${ELIO_IS_TOP_LEVEL})
option(ELIO_ENABLE_DEVELOPER_WARNINGS "Enable strict warning flags for Elio tests/examples" ${ELIO_IS_TOP_LEVEL})
option(ELIO_WARNINGS_AS_ERRORS "Treat warnings as errors for Elio tests/examples" ${ELIO_IS_TOP_LEVEL})
option(ELIO_ENABLE_DEBUG_METADATA "Enable coroutine debug metadata" ON)

# Platform check - Linux only
if(NOT UNIX OR APPLE)
Expand All @@ -34,11 +35,18 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

# Header-only library interface
add_library(elio INTERFACE)
target_include_directories(elio INTERFACE
target_include_directories(elio INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)

# Debug metadata option
if(ELIO_ENABLE_DEBUG_METADATA)
target_compile_definitions(elio INTERFACE ELIO_ENABLE_DEBUG_METADATA=1)
else()
target_compile_definitions(elio INTERFACE ELIO_ENABLE_DEBUG_METADATA=0)
endif()

# Dependencies via FetchContent
include(FetchContent)

Expand Down
113 changes: 80 additions & 33 deletions include/elio/coro/frame_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@ namespace elio::coro {
/// Thread-local free-list based frame allocator for small coroutine frames
/// Dramatically reduces allocation overhead for frequently created/destroyed coroutines
///
/// Design: Each allocated frame has a hidden header storing the source pool ID.
/// Design: Each allocated frame has a hidden header storing the source pool ID and size class.
/// When deallocated on a different thread, the frame is returned via an MPSC queue
/// to its source pool. This handles work-stealing scenarios where coroutines
/// are allocated on thread A but deallocated on thread B.
///
/// Size Classes: Multiple pools for different frame sizes (32, 64, 128, 256 bytes)
/// reduce memory waste for small frames while maintaining allocation performance.
///
/// Note: Under sanitizers, pooling is disabled to allow proper leak/error detection.
class frame_allocator {
public:
// Support frames up to 256 bytes (covers most simple tasks)
// Actual allocation includes header, so user-visible size is MAX_FRAME_SIZE
static constexpr size_t MAX_FRAME_SIZE = 256;
static constexpr size_t POOL_SIZE = 1024;
// Size classes for different frame sizes
static constexpr size_t SIZE_CLASSES[] = {32, 64, 128, 256};
static constexpr size_t NUM_SIZE_CLASSES = 4;
static constexpr size_t POOL_SIZE = 512; // Per size class
static constexpr size_t REMOTE_QUEUE_BATCH = 64; // Process remote returns in batches

// Detect sanitizers: GCC uses __SANITIZE_*, Clang uses __has_feature
Expand All @@ -58,45 +61,48 @@ class frame_allocator {
}
#else
static void* allocate(size_t size) {
if (size <= MAX_FRAME_SIZE) {
size_t sc = find_size_class(size);
if (sc < NUM_SIZE_CLASSES) {
auto& alloc = instance();

// First try to reclaim remote returns periodically
alloc.reclaim_remote_returns();

if (alloc.free_count_ > 0) {
void* block = alloc.pool_[--alloc.free_count_];
if (alloc.free_count_[sc] > 0) {
void* block = alloc.pool_[sc][--alloc.free_count_[sc]];
// Update header to reflect current pool ownership
// This is important because blocks may have been returned from remote threads
auto* header = static_cast<block_header*>(block);
header->source_pool_id = alloc.pool_id_;
header->size_class = static_cast<uint8_t>(sc);
return block_to_user(block);
}

// Allocate new block with header
void* block = ::operator new(ALLOC_BLOCK_SIZE);
void* block = ::operator new(alloc_block_size(sc));
auto* header = static_cast<block_header*>(block);
header->source_pool_id = alloc.pool_id_;
header->size_class = static_cast<uint8_t>(sc);
header->next.store(nullptr, std::memory_order_relaxed);
return block_to_user(block);
}
// Fall back to standard allocation for large frames (no header)
// Fall back to standard allocation for large frames
return ::operator new(size);
}

static void deallocate(void* ptr, size_t size) noexcept {
if (size <= MAX_FRAME_SIZE) {
size_t sc = find_size_class(size);
if (sc < NUM_SIZE_CLASSES) {
void* block = user_to_block(ptr);
auto* header = static_cast<block_header*>(block);
auto& alloc = instance();

// Fast path: same thread - return directly to local pool
if (header->source_pool_id == alloc.pool_id_) {
if (alloc.free_count_ < POOL_SIZE) {
alloc.pool_[alloc.free_count_++] = block;
if (alloc.free_count_[sc] < POOL_SIZE) {
alloc.pool_[sc][alloc.free_count_[sc]++] = block;
return;
}
// Pool full, delete the block (not the user pointer!)
// Pool full, delete the block
::operator delete(block);
return;
} else {
Expand All @@ -119,13 +125,33 @@ class frame_allocator {
private:
// Block header stored before user data
struct block_header {
uint32_t source_pool_id; // ID of the pool that allocated this block
std::atomic<block_header*> next; // For MPSC queue linkage
uint32_t source_pool_id; // ID of the pool that allocated this block
uint8_t size_class; // Size class index (0-3)
std::atomic<block_header*> next; // For MPSC queue linkage
};

// Total block size including header, aligned for user data
// Header size
static constexpr size_t HEADER_SIZE = sizeof(block_header);
static constexpr size_t ALLOC_BLOCK_SIZE = HEADER_SIZE + MAX_FRAME_SIZE;

// Find size class index for requested size
static size_t find_size_class(size_t size) noexcept {
for (size_t i = 0; i < NUM_SIZE_CLASSES; ++i) {
if (size <= SIZE_CLASSES[i]) {
return i;
}
}
return NUM_SIZE_CLASSES; // Not found (for sizes > 256)
}

// Get actual size for a size class
static size_t size_class_size(size_t idx) noexcept {
return SIZE_CLASSES[idx];
}

// Total block size including header for a given size class
static size_t alloc_block_size(size_t size_class_idx) noexcept {
return HEADER_SIZE + SIZE_CLASSES[size_class_idx];
}

// Convert between block (with header) and user pointer
static void* block_to_user(void* block) noexcept {
Expand All @@ -137,10 +163,17 @@ class frame_allocator {
}

frame_allocator()
: free_count_(0)
, pool_id_(next_pool_id_.fetch_add(1, std::memory_order_relaxed))
, remote_head_{0, {nullptr}} // Initialize dummy head: pool_id=0, next=nullptr
: pool_id_(next_pool_id_.fetch_add(1, std::memory_order_relaxed))
, remote_head_{}
, remote_tail_(&remote_head_) {
// Initialize remote_head_ fields after default construction
remote_head_.source_pool_id = 0;
remote_head_.size_class = 0;
remote_head_.next.store(nullptr, std::memory_order_relaxed);
// Initialize free counts to 0
for (size_t i = 0; i < NUM_SIZE_CLASSES; ++i) {
free_count_[i] = 0;
}
// Register this pool for cross-thread access
register_pool(this);
}
Expand All @@ -153,8 +186,10 @@ class frame_allocator {
reclaim_all_remote_returns();

// Free all cached frames when thread exits
for (size_t i = 0; i < free_count_; ++i) {
::operator delete(pool_[i]);
for (size_t sc = 0; sc < NUM_SIZE_CLASSES; ++sc) {
for (size_t i = 0; i < free_count_[sc]; ++i) {
::operator delete(pool_[sc][i]);
}
}
}

Expand All @@ -168,14 +203,14 @@ class frame_allocator {
prev->next.store(header, std::memory_order_release);
}

// Called by owner thread to reclaim remote returns
// Called by owner thread to reclaim remote returns for all size classes
void reclaim_remote_returns() noexcept {
// Quick check without full synchronization
block_header* head = remote_head_.next.load(std::memory_order_acquire);
if (!head) return;

size_t count = 0;
while (head && count < REMOTE_QUEUE_BATCH && free_count_ < POOL_SIZE) {
while (head && count < REMOTE_QUEUE_BATCH) {
block_header* next = head->next.load(std::memory_order_acquire);

// If next is null but tail points elsewhere, the producer is in the
Expand All @@ -193,10 +228,21 @@ class frame_allocator {
if (!next) break;
}

pool_[free_count_++] = head;
remote_head_.next.store(next, std::memory_order_release);
// Add to appropriate size class pool
size_t sc = head->size_class;
if (sc < NUM_SIZE_CLASSES && free_count_[sc] < POOL_SIZE) {
pool_[sc][free_count_[sc]++] = head;
remote_head_.next.store(next, std::memory_order_release);
++count;
} else if (sc >= NUM_SIZE_CLASSES) {
// Invalid size class - delete the block
::operator delete(head);
remote_head_.next.store(next, std::memory_order_release);
} else {
// Pool full - leave it in the queue for later
break;
}
head = next;
++count;
}
}

Expand All @@ -218,8 +264,9 @@ class frame_allocator {
if (!next) break;
}

if (free_count_ < POOL_SIZE) {
pool_[free_count_++] = head;
size_t sc = head->size_class;
if (sc < NUM_SIZE_CLASSES && free_count_[sc] < POOL_SIZE) {
pool_[sc][free_count_[sc]++] = head;
} else {
::operator delete(head);
}
Expand Down Expand Up @@ -267,8 +314,8 @@ class frame_allocator {
return nullptr;
}

std::array<void*, POOL_SIZE> pool_;
size_t free_count_;
std::array<std::array<void*, POOL_SIZE>, NUM_SIZE_CLASSES> pool_;
std::array<size_t, NUM_SIZE_CLASSES> free_count_;
uint32_t pool_id_;

// MPSC queue for remote returns (dummy head node pattern)
Expand Down
47 changes: 34 additions & 13 deletions include/elio/coro/promise_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class id_allocator {
/// Base class for all coroutine promise types
/// Implements lightweight virtual stack tracking via thread-local intrusive list
///
/// Debug support:
/// Debug support (when ELIO_ENABLE_DEBUG_METADATA=1):
/// - Each frame has a unique ID for identification
/// - Source location can be set for debugging
/// - State tracking (created/running/suspended/completed/failed)
Expand All @@ -88,14 +88,16 @@ class promise_base {
promise_base() noexcept
: frame_magic_(FRAME_MAGIC)
, parent_(current_frame_)
#if ELIO_ENABLE_DEBUG_METADATA
, debug_state_(coroutine_state::created)
, debug_worker_id_(static_cast<uint32_t>(-1))
, debug_id_(0) // Lazy allocation - only allocated when id() is called
#endif
, affinity_(NO_AFFINITY)
{
current_frame_ = this;
}

~promise_base() noexcept {
current_frame_ = parent_;
}
Expand All @@ -107,13 +109,15 @@ class promise_base {

void unhandled_exception() noexcept {
exception_ = std::current_exception();
#if ELIO_ENABLE_DEBUG_METADATA
debug_state_ = coroutine_state::failed;
#endif
}

[[nodiscard]] std::exception_ptr exception() const noexcept {
return exception_;
}

[[nodiscard]] promise_base* parent() const noexcept {
return parent_;
}
Expand All @@ -122,7 +126,8 @@ class promise_base {
return current_frame_;
}

// Debug accessors
// Debug accessors (available only when debug metadata is enabled)
#if ELIO_ENABLE_DEBUG_METADATA
[[nodiscard]] uint64_t frame_magic() const noexcept { return frame_magic_; }
[[nodiscard]] const debug_location& location() const noexcept { return debug_location_; }
[[nodiscard]] coroutine_state state() const noexcept { return debug_state_; }
Expand All @@ -149,39 +154,55 @@ class promise_base {
void set_worker_id(uint32_t id) noexcept {
debug_worker_id_ = id;
}
#else
// Stub accessors when debug metadata is disabled
[[nodiscard]] uint64_t frame_magic() const noexcept { return frame_magic_; }
[[nodiscard]] uint64_t id() noexcept { return 0; }
[[nodiscard]] uint32_t worker_id() const noexcept { return static_cast<uint32_t>(-1); }
[[nodiscard]] coroutine_state state() const noexcept { return coroutine_state::running; }
[[nodiscard]] const debug_location& location() const noexcept {
static const debug_location empty{};
return empty;
}
void set_location(const char*, const char*, uint32_t) noexcept {}
void set_state(coroutine_state) noexcept {}
void set_worker_id(uint32_t) noexcept {}
#endif

// Affinity accessors
/// Get the current thread affinity for this vthread
/// @return Worker ID this vthread is bound to, or NO_AFFINITY if unbound
[[nodiscard]] size_t affinity() const noexcept { return affinity_; }

/// Set thread affinity for this vthread
/// @param worker_id Worker ID to bind to, or NO_AFFINITY to clear
void set_affinity(size_t worker_id) noexcept { affinity_ = worker_id; }

/// Check if this vthread has affinity set
[[nodiscard]] bool has_affinity() const noexcept { return affinity_ != NO_AFFINITY; }

/// Clear thread affinity, allowing this vthread to migrate freely
void clear_affinity() noexcept { affinity_ = NO_AFFINITY; }

private:
// Magic number at start for debugger validation
uint64_t frame_magic_;

// Virtual stack tracking
promise_base* parent_;
std::exception_ptr exception_;

// Debug metadata

#if ELIO_ENABLE_DEBUG_METADATA
// Debug metadata (conditionally compiled)
debug_location debug_location_;
coroutine_state debug_state_;
uint32_t debug_worker_id_;
uint64_t debug_id_;

#endif

// Thread affinity: NO_AFFINITY means can migrate freely
size_t affinity_;

static inline thread_local promise_base* current_frame_ = nullptr;
};

Expand Down
Loading
Loading