diff --git a/.gitignore b/.gitignore index 87c2357..22d75a2 100644 --- a/.gitignore +++ b/.gitignore @@ -35,5 +35,5 @@ benchmark # Temporary files *.tmp .DS_Store -.qoder/ +.clangd __pycache__ diff --git a/CMakeLists.txt b/CMakeLists.txt index d654e56..1160504 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -216,6 +216,7 @@ install(PROGRAMS ) install(FILES tools/elio-gdb.py + tools/elio_lldb.py tools/elio-lldb.py DESTINATION share/elio ) diff --git a/README.md b/README.md index d10aefc..fe7bd36 100644 --- a/README.md +++ b/README.md @@ -188,7 +188,8 @@ elio:: tools/ // Debugging tools ├── elio-pstack // pstack-like CLI tool ├── elio-gdb.py // GDB Python extension -└── elio-lldb.py // LLDB Python extension +├── elio_lldb.py // LLDB import entrypoint +└── elio-lldb.py // LLDB implementation script ``` ### Virtual Stack @@ -269,7 +270,7 @@ gdb -ex 'source tools/elio-gdb.py' ./myapp (gdb) elio bt 42 # Show backtrace for vthread #42 # LLDB extension -lldb -o 'command script import tools/elio-lldb.py' ./myapp +lldb -o 'command script import tools/elio_lldb.py' ./myapp (lldb) elio list (lldb) elio bt ``` diff --git a/examples/debug_test.cpp b/examples/debug_test.cpp index 1168586..8b9fdde 100644 --- a/examples/debug_test.cpp +++ b/examples/debug_test.cpp @@ -4,7 +4,7 @@ /// coroutines that run concurrently and can be inspected using: /// - elio-pstack (command line) /// - GDB with elio-gdb.py -/// - LLDB with elio-lldb.py +/// - LLDB with elio_lldb.py (entrypoint wrapper) /// /// Usage: /// ./debug_test # Run normally @@ -135,6 +135,7 @@ coro::task async_main(int argc, char* argv[]) { std::cout << "Paused for debugger. Use one of:" << std::endl; std::cout << " elio-pstack " << getpid() << std::endl; std::cout << " gdb -p " << getpid() << " -ex 'source tools/elio-gdb.py' -ex 'elio bt'" << std::endl; + std::cout << " lldb -p " << getpid() << " -o 'command script import tools/elio_lldb.py' -o 'elio bt'" << std::endl; std::cout << std::endl; std::cout << "Press Ctrl+C to exit." << std::endl; std::cout << std::endl; diff --git a/examples/rpc_server_example.cpp b/examples/rpc_server_example.cpp index dddedef..ca33657 100644 --- a/examples/rpc_server_example.cpp +++ b/examples/rpc_server_example.cpp @@ -118,6 +118,11 @@ using Echo = ELIO_RPC_METHOD(5, EchoRequest, EchoResponse); // In-memory user store class UserStore { public: + struct UserListSnapshot { + std::vector users; + int32_t total_count; + }; + std::optional get_user(int32_t id) { std::lock_guard lock(mutex_); auto it = users_.find(id); @@ -135,22 +140,18 @@ class UserStore { return id; } - std::vector list_users(int32_t offset, int32_t limit) { + UserListSnapshot list_users_snapshot(int32_t offset, int32_t limit) { std::lock_guard lock(mutex_); - std::vector result; + UserListSnapshot snapshot; + snapshot.total_count = static_cast(users_.size()); int32_t count = 0; for (const auto& [id, user] : users_) { - if (count >= offset && result.size() < static_cast(limit)) { - result.push_back(user); + if (count >= offset && snapshot.users.size() < static_cast(limit)) { + snapshot.users.push_back(user); } ++count; } - return result; - } - - int32_t total_count() { - std::lock_guard lock(mutex_); - return static_cast(users_.size()); + return snapshot; } private: @@ -235,8 +236,9 @@ task server_main(uint16_t port, [[maybe_unused]] scheduler& sched) { server.register_method([](const ListUsersRequest& req) -> task { ListUsersResponse resp; - resp.users = g_user_store.list_users(req.offset, req.limit); - resp.total_count = g_user_store.total_count(); + auto snapshot = g_user_store.list_users_snapshot(req.offset, req.limit); + resp.users = std::move(snapshot.users); + resp.total_count = snapshot.total_count; co_return resp; }); diff --git a/examples/tcp_echo_client.cpp b/examples/tcp_echo_client.cpp index 7cd3471..f6c6c22 100644 --- a/examples/tcp_echo_client.cpp +++ b/examples/tcp_echo_client.cpp @@ -29,9 +29,14 @@ using namespace elio::net; /// Client coroutine - connects, sends messages, receives responses task client_main(std::string_view host, uint16_t port) { ELIO_LOG_INFO("Connecting to {}:{}...", host, port); - - // Connect to server - auto stream_result = co_await tcp_connect(host, port); + + auto resolved = co_await resolve_hostname(host, port); + if (!resolved) { + ELIO_LOG_ERROR("Resolve failed: {}", strerror(errno)); + co_return 1; + } + + auto stream_result = co_await tcp_connect(*resolved); if (!stream_result) { ELIO_LOG_ERROR("Connection failed: {}", strerror(errno)); @@ -79,8 +84,14 @@ task client_main(std::string_view host, uint16_t port) { /// Non-interactive benchmark mode task benchmark_main(std::string_view host, uint16_t port, int iterations) { ELIO_LOG_INFO("Connecting to {}:{} for benchmark...", host, port); - - auto stream_result = co_await tcp_connect(host, port); + + auto resolved = co_await resolve_hostname(host, port); + if (!resolved) { + ELIO_LOG_ERROR("Resolve failed: {}", strerror(errno)); + co_return 1; + } + + auto stream_result = co_await tcp_connect(*resolved); if (!stream_result) { ELIO_LOG_ERROR("Connection failed: {}", strerror(errno)); co_return 1; diff --git a/include/elio/debug.hpp b/include/elio/debug.hpp index a387dd4..0348956 100644 --- a/include/elio/debug.hpp +++ b/include/elio/debug.hpp @@ -17,7 +17,7 @@ /// elio list # List all vthreads /// elio workers # Show worker information /// -/// In LLDB: command script import /path/to/elio-lldb.py +/// In LLDB: command script import /path/to/elio_lldb.py /// elio bt # Show all vthread backtraces /// /// Command line: diff --git a/include/elio/elio.hpp b/include/elio/elio.hpp index 91938b1..86157a7 100644 --- a/include/elio/elio.hpp +++ b/include/elio/elio.hpp @@ -40,6 +40,7 @@ // Networking #include "net/tcp.hpp" +#include "net/resolve.hpp" #include "net/uds.hpp" // Timers diff --git a/include/elio/http/client_base.hpp b/include/elio/http/client_base.hpp index c422c2f..1ed335b 100644 --- a/include/elio/http/client_base.hpp +++ b/include/elio/http/client_base.hpp @@ -9,6 +9,7 @@ /// - Connection utility functions #include +#include #include #include #include @@ -16,9 +17,31 @@ #include #include +#include +#include namespace elio::http { +namespace detail { + +inline size_t next_rotation_offset(const std::string& host, uint16_t port, size_t count) { + if (count == 0) { + return 0; + } + + static std::mutex mutex; + static std::unordered_map state; + + std::lock_guard lock(mutex); + std::string key = host + ":" + std::to_string(port); + size_t& cursor = state[key]; + size_t offset = cursor % count; + cursor = (cursor + 1) % count; + return offset; +} + +} // namespace detail + /// Base configuration shared by all HTTP-based clients /// Can be embedded in more specific configuration structures struct base_client_config { @@ -27,6 +50,8 @@ struct base_client_config { size_t read_buffer_size = 8192; ///< Read buffer size std::string user_agent; ///< User-Agent header (empty = no header) bool verify_certificate = true; ///< Verify TLS certificates + net::resolve_options resolve_options = net::default_cached_resolve_options(); ///< DNS resolve/cache behavior + bool rotate_resolved_addresses = true; ///< Rotate start index across resolved addresses }; /// Initialize a TLS context for client use with default settings @@ -49,28 +74,56 @@ inline void init_client_tls_context(tls::tls_context& ctx, bool verify_certifica /// @return Connected stream or std::nullopt on error inline coro::task> client_connect(std::string_view host, uint16_t port, bool secure, - tls::tls_context* tls_ctx) { + tls::tls_context* tls_ctx, + net::resolve_options resolve_opts = net::default_cached_resolve_options(), + bool rotate_resolved_addresses = true) { + + auto addresses = co_await net::resolve_all(host, port, resolve_opts); + if (addresses.empty()) { + ELIO_LOG_ERROR("Failed to resolve {}:{}: {}", host, port, strerror(errno)); + co_return std::nullopt; + } + + size_t offset = rotate_resolved_addresses + ? detail::next_rotation_offset(std::string(host), port, addresses.size()) + : 0; + if (secure) { if (!tls_ctx) { ELIO_LOG_ERROR("TLS context required for secure connection to {}:{}", host, port); co_return std::nullopt; } - auto result = co_await tls::tls_connect(*tls_ctx, host, port); - if (!result) { - ELIO_LOG_ERROR("Failed to connect to {}:{}: {}", host, port, strerror(errno)); - co_return std::nullopt; + for (size_t i = 0; i < addresses.size(); ++i) { + const auto& addr = addresses[(offset + i) % addresses.size()]; + auto tcp = co_await net::tcp_connect(addr); + if (!tcp) { + continue; + } + + tls::tls_stream tls_stream(std::move(*tcp), *tls_ctx); + tls_stream.set_hostname(host); + auto hs = co_await tls_stream.handshake(); + if (!hs) { + continue; + } + + co_return net::stream(std::move(tls_stream)); } - co_return net::stream(std::move(*result)); + ELIO_LOG_ERROR("Failed to connect to {}:{}: {}", host, port, strerror(errno)); + co_return std::nullopt; } else { - auto result = co_await net::tcp_connect(host, port); - if (!result) { - ELIO_LOG_ERROR("Failed to connect to {}:{}: {}", host, port, strerror(errno)); - co_return std::nullopt; + for (size_t i = 0; i < addresses.size(); ++i) { + const auto& addr = addresses[(offset + i) % addresses.size()]; + auto result = co_await net::tcp_connect(addr); + if (result) { + co_return net::stream(std::move(*result)); + } } - co_return net::stream(std::move(*result)); + ELIO_LOG_ERROR("Failed to connect to {}:{}: {}", host, port, strerror(errno)); + co_return std::nullopt; } } diff --git a/include/elio/http/http2_client.hpp b/include/elio/http/http2_client.hpp index ddcf0e0..ac2e287 100644 --- a/include/elio/http/http2_client.hpp +++ b/include/elio/http/http2_client.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -16,6 +17,7 @@ #include #include #include +#include namespace elio::http { @@ -27,6 +29,8 @@ struct h2_client_config { uint32_t initial_window_size = 65535; ///< Initial flow control window size std::string user_agent = "elio-http2/1.0"; ///< User-Agent header bool enable_push = false; ///< Enable server push (rarely needed) + net::resolve_options resolve_options = net::default_cached_resolve_options(); ///< DNS resolve/cache behavior + bool rotate_resolved_addresses = true; ///< Rotate start index across resolved addresses }; /// HTTP/2 connection wrapper @@ -197,44 +201,58 @@ class h2_client { co_return std::move(conn); } - // Create new HTTP/2 connection - // First establish TCP connection - auto tcp_result = co_await net::tcp_connect(host, port); - if (!tcp_result) { - ELIO_LOG_ERROR("Failed to connect to {}:{}", host, port); + auto addresses = co_await net::resolve_all(host, port, config_.resolve_options); + if (addresses.empty()) { + ELIO_LOG_ERROR("Failed to resolve {}:{}", host, port); co_return std::nullopt; } - - // Create TLS stream with ALPN - tls::tls_stream tls_stream(std::move(*tcp_result), tls_ctx_); - tls_stream.set_hostname(host); - - // Perform TLS handshake - auto hs_result = co_await tls_stream.handshake(); - if (!hs_result) { - ELIO_LOG_ERROR("TLS handshake failed for {}:{}", host, port); - co_return std::nullopt; - } - - // Verify ALPN negotiated h2 - auto alpn = tls_stream.alpn_protocol(); - if (alpn != "h2") { - ELIO_LOG_ERROR("Server does not support HTTP/2 (ALPN: {})", - alpn.empty() ? "(none)" : std::string(alpn)); - co_return std::nullopt; + + size_t offset = 0; + if (config_.rotate_resolved_addresses) { + static std::mutex rotation_mutex; + static std::unordered_map rotation_cursor; + std::lock_guard lock(rotation_mutex); + size_t& cursor = rotation_cursor[key]; + offset = cursor % addresses.size(); + cursor = (cursor + 1) % addresses.size(); } - - ELIO_LOG_DEBUG("HTTP/2 connection established to {}:{}", host, port); - - h2_connection conn(std::move(tls_stream)); - - // Process initial frames (settings exchange) - if (!co_await conn.session()->process()) { - ELIO_LOG_ERROR("HTTP/2 session initialization failed"); - co_return std::nullopt; + + for (size_t i = 0; i < addresses.size(); ++i) { + const auto& addr = addresses[(offset + i) % addresses.size()]; + + auto tcp_result = co_await net::tcp_connect(addr); + if (!tcp_result) { + continue; + } + + tls::tls_stream tls_stream(std::move(*tcp_result), tls_ctx_); + tls_stream.set_hostname(host); + + auto hs_result = co_await tls_stream.handshake(); + if (!hs_result) { + continue; + } + + auto alpn = tls_stream.alpn_protocol(); + if (alpn != "h2") { + ELIO_LOG_ERROR("Server does not support HTTP/2 (ALPN: {})", + alpn.empty() ? "(none)" : std::string(alpn)); + continue; + } + + ELIO_LOG_DEBUG("HTTP/2 connection established to {}:{}", host, port); + + h2_connection conn(std::move(tls_stream)); + if (!co_await conn.session()->process()) { + ELIO_LOG_ERROR("HTTP/2 session initialization failed"); + continue; + } + + co_return std::move(conn); } - - co_return std::move(conn); + + ELIO_LOG_ERROR("Failed to connect to any resolved address for {}:{}", host, port); + co_return std::nullopt; } /// Return a connection to the pool diff --git a/include/elio/http/http_client.hpp b/include/elio/http/http_client.hpp index c4b11a6..0fd9aab 100644 --- a/include/elio/http/http_client.hpp +++ b/include/elio/http/http_client.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -39,6 +40,8 @@ using connection = net::stream; /// Connection pool for HTTP keep-alive class connection_pool { public: + static constexpr size_t shard_count = 16; + explicit connection_pool(client_config config = {}) : config_(config) {} @@ -48,12 +51,13 @@ class connection_pool { bool secure, tls::tls_context* tls_ctx = nullptr) { std::string key = make_key(host, port, secure); + auto& shard = shard_for(key); // Try to get an existing connection { - std::lock_guard lock(mutex_); - auto it = pools_.find(key); - if (it != pools_.end() && !it->second.empty()) { + std::lock_guard lock(shard.mutex); + auto it = shard.pools.find(key); + if (it != shard.pools.end() && !it->second.empty()) { auto conn = std::move(it->second.front()); it->second.pop_front(); @@ -68,7 +72,13 @@ class connection_pool { } // Create new connection using client_connect utility - auto result = co_await client_connect(host, port, secure, tls_ctx); + auto result = co_await client_connect( + host, + port, + secure, + tls_ctx, + config_.resolve_options, + config_.rotate_resolved_addresses); if (!result) { co_return std::nullopt; } @@ -79,9 +89,10 @@ class connection_pool { /// Return a connection to the pool void release(const std::string& host, uint16_t port, bool secure, connection conn) { std::string key = make_key(host, port, secure); + auto& shard = shard_for(key); - std::lock_guard lock(mutex_); - auto& pool = pools_[key]; + std::lock_guard lock(shard.mutex); + auto& pool = shard.pools[key]; if (pool.size() < config_.max_connections_per_host) { conn.touch(); @@ -92,18 +103,28 @@ class connection_pool { /// Clear all pooled connections void clear() { - std::lock_guard lock(mutex_); - pools_.clear(); + for (auto& shard : shards_) { + std::lock_guard lock(shard.mutex); + shard.pools.clear(); + } } private: static std::string make_key(const std::string& host, uint16_t port, bool secure) { return (secure ? "https://" : "http://") + host + ":" + std::to_string(port); } + + struct pool_shard { + std::mutex mutex; + std::unordered_map> pools; + }; + + pool_shard& shard_for(const std::string& key) noexcept { + return shards_[std::hash{}(key) % shard_count]; + } client_config config_; - std::mutex mutex_; - std::unordered_map> pools_; + std::array shards_; }; /// HTTP client diff --git a/include/elio/http/sse_client.hpp b/include/elio/http/sse_client.hpp index a42537a..0eed2ad 100644 --- a/include/elio/http/sse_client.hpp +++ b/include/elio/http/sse_client.hpp @@ -353,7 +353,12 @@ class sse_client { // Establish connection using shared utility auto conn_result = co_await http::client_connect( - url_.host, url_.effective_port(), url_.is_secure(), &tls_ctx_); + url_.host, + url_.effective_port(), + url_.is_secure(), + &tls_ctx_, + config_.resolve_options, + config_.rotate_resolved_addresses); if (!conn_result) { state_ = client_state::disconnected; co_return false; diff --git a/include/elio/http/websocket_client.hpp b/include/elio/http/websocket_client.hpp index 89d180f..1c1859d 100644 --- a/include/elio/http/websocket_client.hpp +++ b/include/elio/http/websocket_client.hpp @@ -248,7 +248,13 @@ class ws_client { } // Establish connection using shared utility - auto conn_result = co_await http::client_connect(host_, port, secure_, &tls_ctx_); + auto conn_result = co_await http::client_connect( + host_, + port, + secure_, + &tls_ctx_, + config_.resolve_options, + config_.rotate_resolved_addresses); if (!conn_result) { co_return false; } diff --git a/include/elio/net/resolve.hpp b/include/elio/net/resolve.hpp new file mode 100644 index 0000000..57e9de8 --- /dev/null +++ b/include/elio/net/resolve.hpp @@ -0,0 +1,382 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace elio::net { + +struct resolve_cache_key { + std::string host; + uint16_t port = 0; + + bool operator==(const resolve_cache_key& other) const noexcept { + return host == other.host && port == other.port; + } +}; + +struct resolve_cache_key_hash { + size_t operator()(const resolve_cache_key& key) const noexcept { + size_t seed = std::hash{}(key.host); + seed ^= static_cast(key.port) + 0x9e3779b97f4a7c15ULL + (seed << 6U) + (seed >> 2U); + return seed; + } +}; + +struct resolve_cache_stats { + size_t cache_hits = 0; + size_t cache_misses = 0; + size_t cache_stores = 0; + size_t cache_invalidations = 0; +}; + +struct resolve_cache_entry { + std::vector addresses; + std::chrono::steady_clock::time_point expires_at{}; +}; + +class resolve_cache { +public: + static constexpr size_t shard_count = 16; + + bool try_get(const resolve_cache_key& key, std::vector& out) { + auto& shard = shard_for(key); + std::lock_guard lock(shard.mutex); + prune_expired_locked(shard.entries, std::chrono::steady_clock::now()); + + auto it = shard.entries.find(key); + if (it == shard.entries.end()) { + return false; + } + + out = it->second.addresses; + cache_hits_.fetch_add(1, std::memory_order_relaxed); + return true; + } + + void store(const resolve_cache_key& key, + std::vector addresses, + std::chrono::seconds ttl) { + auto& shard = shard_for(key); + std::lock_guard lock(shard.mutex); + resolve_cache_entry entry; + entry.addresses = std::move(addresses); + entry.expires_at = std::chrono::steady_clock::now() + ttl; + shard.entries[key] = std::move(entry); + cache_stores_.fetch_add(1, std::memory_order_relaxed); + } + + bool invalidate(const std::string_view host, uint16_t port) { + resolve_cache_key key{std::string(host), port}; + auto& shard = shard_for(key); + std::lock_guard lock(shard.mutex); + size_t erased = shard.entries.erase(key); + if (erased > 0) { + cache_invalidations_.fetch_add(erased, std::memory_order_relaxed); + return true; + } + return false; + } + + size_t invalidate_host(const std::string_view host) { + size_t removed = 0; + for (auto& shard : shards_) { + std::lock_guard lock(shard.mutex); + for (auto it = shard.entries.begin(); it != shard.entries.end();) { + if (it->first.host == host) { + it = shard.entries.erase(it); + ++removed; + } else { + ++it; + } + } + } + if (removed > 0) { + cache_invalidations_.fetch_add(removed, std::memory_order_relaxed); + } + return removed; + } + + void clear() { + size_t removed = 0; + for (auto& shard : shards_) { + std::lock_guard lock(shard.mutex); + removed += shard.entries.size(); + shard.entries.clear(); + } + cache_invalidations_.fetch_add(removed, std::memory_order_relaxed); + } + + resolve_cache_stats stats() const noexcept { + resolve_cache_stats out; + out.cache_hits = cache_hits_.load(std::memory_order_relaxed); + out.cache_misses = cache_misses_.load(std::memory_order_relaxed); + out.cache_stores = cache_stores_.load(std::memory_order_relaxed); + out.cache_invalidations = cache_invalidations_.load(std::memory_order_relaxed); + return out; + } + + void record_miss() { + cache_misses_.fetch_add(1, std::memory_order_relaxed); + } + +private: + struct cache_shard { + std::mutex mutex; + std::unordered_map entries; + }; + + cache_shard& shard_for(const resolve_cache_key& key) noexcept { + size_t idx = resolve_cache_key_hash{}(key) % shard_count; + return shards_[idx]; + } + + static void prune_expired_locked( + std::unordered_map& entries, + std::chrono::steady_clock::time_point now) { + for (auto it = entries.begin(); it != entries.end();) { + if (it->second.expires_at <= now) { + it = entries.erase(it); + } else { + ++it; + } + } + } + + std::array shards_; + std::atomic cache_hits_{0}; + std::atomic cache_misses_{0}; + std::atomic cache_stores_{0}; + std::atomic cache_invalidations_{0}; +}; + +inline resolve_cache& default_resolve_cache() { + static resolve_cache cache; + return cache; +} + +struct resolve_options { + bool use_cache = false; + resolve_cache* cache = nullptr; + std::chrono::seconds positive_ttl{60}; + std::chrono::seconds negative_ttl{5}; +}; + +inline resolve_options default_cached_resolve_options() { + resolve_options opts; + opts.use_cache = true; + opts.cache = &default_resolve_cache(); + return opts; +} + +struct resolve_waiter_state { + std::vector results; + int error = 0; + runtime::scheduler* scheduler = nullptr; + std::coroutine_handle<> handle; + size_t saved_affinity = coro::NO_AFFINITY; + void* handle_address = nullptr; + + void restore_affinity() const noexcept { + if (!handle_address) { + return; + } + auto* promise = coro::get_promise_base(handle_address); + if (!promise) { + return; + } + if (saved_affinity == coro::NO_AFFINITY) { + promise->clear_affinity(); + } else { + promise->set_affinity(saved_affinity); + } + } +}; + +inline bool try_parse_ipv4_literal(std::string_view host, uint16_t port, + std::vector& out) { + struct in_addr addr{}; + std::string host_str(host); + if (inet_pton(AF_INET, host_str.c_str(), &addr) != 1) { + return false; + } + + ipv4_address parsed; + parsed.addr = addr.s_addr; + parsed.port = port; + out.push_back(socket_address(parsed)); + return true; +} + +inline bool try_parse_ipv6_literal(std::string_view host, uint16_t port, + std::vector& out) { + std::string ip_str(host); + uint32_t scope_id = 0; + size_t scope_pos = ip_str.find('%'); + if (scope_pos != std::string::npos) { + std::string scope_name = ip_str.substr(scope_pos + 1); + ip_str = ip_str.substr(0, scope_pos); + scope_id = if_nametoindex(scope_name.c_str()); + } + + struct in6_addr addr{}; + if (inet_pton(AF_INET6, ip_str.c_str(), &addr) != 1) { + return false; + } + + ipv6_address parsed; + parsed.addr = addr; + parsed.port = port; + parsed.scope_id = scope_id; + out.push_back(socket_address(parsed)); + return true; +} + +class resolve_all_awaitable { +public: + resolve_all_awaitable(std::string_view host, uint16_t port, resolve_options options) + : host_(host) + , key_{std::string(host), port} + , options_(options) + , state_(std::make_shared()) { + if (host.empty() || host == "::" || host == "0.0.0.0") { + state_->results.push_back(socket_address(host, port)); + return; + } + + if (host.find(':') != std::string_view::npos) { + try_parse_ipv6_literal(host, port, state_->results); + return; + } + + try_parse_ipv4_literal(host, port, state_->results); + } + + bool await_ready() const noexcept { + if (!state_->results.empty()) { + return true; + } + + if (!options_.use_cache) { + return false; + } + + resolve_cache* cache = options_.cache ? options_.cache : &default_resolve_cache(); + if (cache->try_get(key_, state_->results)) { + return true; + } + + cache->record_miss(); + return false; + } + + template + bool await_suspend(std::coroutine_handle awaiter) { + state_->handle = awaiter; + state_->scheduler = runtime::scheduler::current(); + state_->handle_address = awaiter.address(); + + if constexpr (std::is_base_of_v) { + state_->saved_affinity = awaiter.promise().affinity(); + auto* worker = runtime::worker_thread::current(); + if (worker) { + awaiter.promise().set_affinity(worker->worker_id()); + } + } + + auto host = host_; + auto key = key_; + auto options = options_; + auto state = state_; + + std::thread([host = std::move(host), key = std::move(key), options, state]() mutable { + struct addrinfo hints{}; + struct addrinfo* result = nullptr; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + std::string service = std::to_string(key.port); + int rc = getaddrinfo(host.c_str(), service.c_str(), &hints, &result); + if (rc == 0 && result) { + for (auto* current = result; current != nullptr; current = current->ai_next) { + if (current->ai_family == AF_INET6) { + auto* sa = reinterpret_cast(current->ai_addr); + state->results.push_back(socket_address(ipv6_address(*sa))); + } else if (current->ai_family == AF_INET) { + auto* sa = reinterpret_cast(current->ai_addr); + state->results.push_back(socket_address(ipv4_address(*sa))); + } + } + freeaddrinfo(result); + } + + if (state->results.empty()) { + state->error = (rc == EAI_SYSTEM) ? errno : EHOSTUNREACH; + if (options.use_cache) { + resolve_cache* cache = options.cache ? options.cache : &default_resolve_cache(); + cache->store(key, {}, options.negative_ttl); + } + } else if (options.use_cache) { + resolve_cache* cache = options.cache ? options.cache : &default_resolve_cache(); + cache->store(key, state->results, options.positive_ttl); + } + + if (state->scheduler && state->scheduler->is_running()) { + state->scheduler->spawn(state->handle); + } else { + runtime::schedule_handle(state->handle); + } + }).detach(); + + return true; + } + + std::vector await_resume() { + state_->restore_affinity(); + if (state_->results.empty()) { + errno = state_->error; + } + return state_->results; + } + +private: + std::string host_; + resolve_cache_key key_; + resolve_options options_; + std::shared_ptr state_; +}; + +inline auto resolve_all(std::string_view host, + uint16_t port, + resolve_options options = {}) { + return resolve_all_awaitable(host, port, options); +} + +inline coro::task> resolve_hostname(std::string_view host, + uint16_t port, + resolve_options options = {}) { + auto results = co_await resolve_all(host, port, options); + if (results.empty()) { + co_return std::nullopt; + } + co_return results.front(); +} + +} // namespace elio::net diff --git a/include/elio/net/stream.hpp b/include/elio/net/stream.hpp index 8e15627..5263f53 100644 --- a/include/elio/net/stream.hpp +++ b/include/elio/net/stream.hpp @@ -8,6 +8,7 @@ /// duplication in HTTP, WebSocket, and SSE clients. #include +#include #include #include #include @@ -178,18 +179,24 @@ class stream { /// @return Connected stream on success, std::nullopt on error inline coro::task> connect(std::string_view host, uint16_t port, bool secure = false, - tls::tls_context* tls_ctx = nullptr) { + tls::tls_context* tls_ctx = nullptr, + resolve_options resolve_opts = default_cached_resolve_options()) { if (secure) { if (!tls_ctx) { co_return std::nullopt; } - auto result = co_await tls::tls_connect(*tls_ctx, host, port); + auto result = co_await tls::tls_connect(*tls_ctx, host, port, resolve_opts); if (!result) { co_return std::nullopt; } co_return stream(std::move(*result)); } else { - auto result = co_await tcp_connect(host, port); + auto resolved = co_await resolve_hostname(host, port, resolve_opts); + if (!resolved) { + co_return std::nullopt; + } + + auto result = co_await tcp_connect(*resolved); if (!result) { co_return std::nullopt; } diff --git a/include/elio/net/tcp.hpp b/include/elio/net/tcp.hpp index 126161c..ffff828 100644 --- a/include/elio/net/tcp.hpp +++ b/include/elio/net/tcp.hpp @@ -48,23 +48,10 @@ struct ipv4_address { if (ip.empty() || ip == "0.0.0.0") { addr = INADDR_ANY; } else { - // First try as numeric IP - if (inet_pton(AF_INET, std::string(ip).c_str(), &addr) != 1) { - // Not a numeric IP, try DNS resolution - struct addrinfo hints{}; - struct addrinfo* result = nullptr; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - - std::string ip_str(ip); - if (getaddrinfo(ip_str.c_str(), nullptr, &hints, &result) == 0 && result) { - auto* sa = reinterpret_cast(result->ai_addr); - addr = sa->sin_addr.s_addr; - freeaddrinfo(result); - } else { - ELIO_LOG_ERROR("Failed to resolve hostname: {}", ip); - addr = INADDR_ANY; - } + std::string ip_str(ip); + if (inet_pton(AF_INET, ip_str.c_str(), &addr) != 1) { + ELIO_LOG_ERROR("ipv4_address only accepts numeric IPv4 literals: {}", ip); + addr = INADDR_ANY; } } } @@ -115,23 +102,9 @@ struct ipv6_address { scope_id = if_nametoindex(scope_name.c_str()); } - // First try as numeric IP if (inet_pton(AF_INET6, ip_str.c_str(), &addr) != 1) { - // Not a numeric IP, try DNS resolution - struct addrinfo hints{}; - struct addrinfo* result = nullptr; - hints.ai_family = AF_INET6; - hints.ai_socktype = SOCK_STREAM; - - if (getaddrinfo(ip_str.c_str(), nullptr, &hints, &result) == 0 && result) { - auto* sa = reinterpret_cast(result->ai_addr); - addr = sa->sin6_addr; - scope_id = sa->sin6_scope_id; - freeaddrinfo(result); - } else { - ELIO_LOG_ERROR("Failed to resolve IPv6 hostname: {}", ip); - addr = IN6ADDR_ANY_INIT; - } + ELIO_LOG_ERROR("ipv6_address only accepts numeric IPv6 literals: {}", ip); + addr = IN6ADDR_ANY_INIT; } } } @@ -201,32 +174,8 @@ class socket_address { data_ = ipv6_address(host, port); return; } - - // Try to resolve and prefer IPv6 - struct addrinfo hints{}; - struct addrinfo* result = nullptr; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - - std::string host_str(host); - if (getaddrinfo(host_str.c_str(), nullptr, &hints, &result) == 0 && result) { - // Use the first result - if (result->ai_family == AF_INET6) { - auto* sa = reinterpret_cast(result->ai_addr); - ipv6_address addr(*sa); - addr.port = port; - data_ = addr; - } else { - auto* sa = reinterpret_cast(result->ai_addr); - ipv4_address addr(*sa); - addr.port = port; - data_ = addr; - } - freeaddrinfo(result); - } else { - // Fallback to IPv4 - data_ = ipv4_address(host, port); - } + + data_ = ipv4_address(host, port); } /// Construct from sockaddr_storage @@ -773,10 +722,4 @@ inline auto tcp_connect(const socket_address& addr, return tcp_connect_awaitable(addr, opts); } -/// Connect to a remote TCP server by host and port (auto-detects IPv4/IPv6) -inline auto tcp_connect(std::string_view host, uint16_t port, - const tcp_options& opts = {}) { - return tcp_connect_awaitable(socket_address(host, port), opts); -} - } // namespace elio::net diff --git a/include/elio/rpc/rpc_client.hpp b/include/elio/rpc/rpc_client.hpp index dcc0a72..e0aabc6 100644 --- a/include/elio/rpc/rpc_client.hpp +++ b/include/elio/rpc/rpc_client.hpp @@ -26,11 +26,16 @@ #include #include #include +#include #include +#include +#include #include #include -#include +#include +#include +#include namespace elio::rpc { @@ -73,6 +78,8 @@ class rpc_client : public std::enable_shared_from_this> { public: using stream_type = Stream; using ptr = std::shared_ptr; + + static constexpr size_t pending_shard_count = 16; /// Create a new RPC client from an existing stream static ptr create(Stream stream) { @@ -84,13 +91,54 @@ class rpc_client : public std::enable_shared_from_this> { static coro::task> connect(Args&&... args) requires std::is_same_v { - auto stream = co_await net::tcp_connect(std::forward(args)...); - if (!stream) { + if constexpr (requires { net::tcp_connect(std::forward(args)...); }) { + auto stream = co_await net::tcp_connect(std::forward(args)...); + if (!stream) { + co_return std::nullopt; + } + auto client = create(std::move(*stream)); + client->start_receive_loop(); + co_return client; + } else if constexpr ( + sizeof...(Args) == 2 && + std::is_convertible_v...>>, std::string_view> && + std::is_integral_v...>>>) { + auto forwarded = std::forward_as_tuple(std::forward(args)...); + std::string_view host = std::get<0>(forwarded); + uint16_t port = static_cast(std::get<1>(forwarded)); + + auto addresses = co_await net::resolve_all(host, port); + for (const auto& addr : addresses) { + auto stream = co_await net::tcp_connect(addr); + if (stream) { + auto client = create(std::move(*stream)); + client->start_receive_loop(); + co_return client; + } + } co_return std::nullopt; + } else { + static_assert(sizeof...(Args) == 0, + "rpc_client::connect arguments are not supported"); } - auto client = create(std::move(*stream)); - client->start_receive_loop(); - co_return client; + } + + /// Connect to a TCP server and create client with explicit resolve options + static coro::task> connect(std::string_view host, + uint16_t port, + net::resolve_options resolve_opts) + requires std::is_same_v + { + auto addresses = co_await net::resolve_all(host, port, resolve_opts); + for (const auto& addr : addresses) { + auto stream = co_await net::tcp_connect(addr); + if (stream) { + auto client = create(std::move(*stream)); + client->start_receive_loop(); + co_return client; + } + } + co_return std::nullopt; } /// Connect to a UDS server and create client @@ -128,15 +176,15 @@ class rpc_client : public std::enable_shared_from_this> { } // Cancel all pending requests - { - std::lock_guard lock(pending_mutex_); - for (auto& [id, req] : pending_requests_) { + for (auto& shard : pending_shards_) { + std::lock_guard lock(shard.mutex); + for (auto& [id, req] : shard.requests) { if (req->try_complete()) { req->error = rpc_error::connection_closed; req->completion_event.set(); } } - pending_requests_.clear(); + shard.requests.clear(); } } @@ -216,8 +264,9 @@ class rpc_client : public std::enable_shared_from_this> { auto pending = std::make_shared(); { - std::lock_guard lock(pending_mutex_); - pending_requests_[request_id] = pending; + auto& shard = pending_shard_for(request_id); + std::lock_guard lock(shard.mutex); + shard.requests[request_id] = pending; } // Register cancellation callback @@ -239,8 +288,9 @@ class rpc_client : public std::enable_shared_from_this> { bool sent = co_await write_frame(stream_, request_frame.first, request_frame.second); if (!sent) { - std::lock_guard lock(pending_mutex_); - pending_requests_.erase(request_id); + auto& shard = pending_shard_for(request_id); + std::lock_guard lock(shard.mutex); + shard.requests.erase(request_id); co_return rpc_result(rpc_error::connection_closed); } } @@ -279,8 +329,9 @@ class rpc_client : public std::enable_shared_from_this> { // Remove from pending { - std::lock_guard lock(pending_mutex_); - pending_requests_.erase(request_id); + auto& shard = pending_shard_for(request_id); + std::lock_guard lock(shard.mutex); + shard.requests.erase(request_id); } // Check result @@ -332,8 +383,9 @@ class rpc_client : public std::enable_shared_from_this> { auto pending = std::make_shared(); { - std::lock_guard lock(pending_mutex_); - pending_requests_[ping_id] = pending; + auto& shard = pending_shard_for(ping_id); + std::lock_guard lock(shard.mutex); + shard.requests[ping_id] = pending; } // Send ping @@ -345,8 +397,9 @@ class rpc_client : public std::enable_shared_from_this> { buffer_writer empty; bool sent = co_await write_frame(stream_, header, empty); if (!sent) { - std::lock_guard lock(pending_mutex_); - pending_requests_.erase(ping_id); + auto& shard = pending_shard_for(ping_id); + std::lock_guard lock(shard.mutex); + shard.requests.erase(ping_id); co_return false; } } @@ -374,8 +427,9 @@ class rpc_client : public std::enable_shared_from_this> { co_await pending->completion_event.wait(); { - std::lock_guard lock(pending_mutex_); - pending_requests_.erase(ping_id); + auto& shard = pending_shard_for(ping_id); + std::lock_guard lock(shard.mutex); + shard.requests.erase(ping_id); } co_return !pending->timed_out; @@ -449,9 +503,10 @@ class rpc_client : public std::enable_shared_from_this> { std::shared_ptr pending; { - std::lock_guard lock(pending_mutex_); - auto it = pending_requests_.find(header.request_id); - if (it == pending_requests_.end()) { + auto& shard = pending_shard_for(header.request_id); + std::lock_guard lock(shard.mutex); + auto it = shard.requests.find(header.request_id); + if (it == shard.requests.end()) { ELIO_LOG_WARNING("RPC client: received response for unknown request {}", header.request_id); return; @@ -472,9 +527,10 @@ class rpc_client : public std::enable_shared_from_this> { std::shared_ptr pending; { - std::lock_guard lock(pending_mutex_); - auto it = pending_requests_.find(request_id); - if (it == pending_requests_.end()) { + auto& shard = pending_shard_for(request_id); + std::lock_guard lock(shard.mutex); + auto it = shard.requests.find(request_id); + if (it == shard.requests.end()) { return; } pending = it->second; @@ -488,10 +544,17 @@ class rpc_client : public std::enable_shared_from_this> { Stream stream_; std::atomic closed_{false}; request_id_generator id_generator_; - - // Pending requests map - std::mutex pending_mutex_; - std::unordered_map> pending_requests_; + + struct pending_shard { + std::mutex mutex; + std::unordered_map> requests; + }; + + pending_shard& pending_shard_for(uint32_t request_id) noexcept { + return pending_shards_[request_id % pending_shard_count]; + } + + std::array pending_shards_; // Send mutex for serializing writes sync::mutex send_mutex_; diff --git a/include/elio/runtime/scheduler.hpp b/include/elio/runtime/scheduler.hpp index b060c69..9208c7a 100644 --- a/include/elio/runtime/scheduler.hpp +++ b/include/elio/runtime/scheduler.hpp @@ -9,7 +9,6 @@ #include #include #include -#include namespace elio::runtime { diff --git a/include/elio/tls/tls_stream.hpp b/include/elio/tls/tls_stream.hpp index 9bf427f..af50e59 100644 --- a/include/elio/tls/tls_stream.hpp +++ b/include/elio/tls/tls_stream.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -337,24 +338,33 @@ class tls_stream { /// @param port Port to connect to /// @return TLS stream on success, std::nullopt on error (check errno) inline coro::task> -tls_connect(tls_context& ctx, std::string_view host, uint16_t port) { - // First establish TCP connection - auto tcp_result = co_await net::tcp_connect(host, port); - if (!tcp_result) { +tls_connect(tls_context& ctx, + std::string_view host, + uint16_t port, + net::resolve_options resolve_opts = net::default_cached_resolve_options()) { + auto resolved = co_await net::resolve_all(host, port, resolve_opts); + if (resolved.empty()) { co_return std::nullopt; } - - // Create TLS stream - tls_stream stream(std::move(*tcp_result), ctx); - stream.set_hostname(host); - - // Perform handshake - auto hs_result = co_await stream.handshake(); - if (!hs_result) { - co_return std::nullopt; + + for (const auto& addr : resolved) { + auto tcp_result = co_await net::tcp_connect(addr); + if (!tcp_result) { + continue; + } + + tls_stream stream(std::move(*tcp_result), ctx); + stream.set_hostname(host); + + auto hs_result = co_await stream.handshake(); + if (!hs_result) { + continue; + } + + co_return std::move(stream); } - - co_return std::move(stream); + + co_return std::nullopt; } /// TLS listener for accepting secure connections diff --git a/tests/unit/test_io.cpp b/tests/unit/test_io.cpp index cc3bc11..0bcd8d3 100644 --- a/tests/unit/test_io.cpp +++ b/tests/unit/test_io.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -1011,6 +1012,75 @@ static task tcp_connect_regression_attempt( co_return; } +static task accept_n_connections( + tcp_listener& listener, + int count, + std::atomic& accepted) { + for (int i = 0; i < count; ++i) { + auto stream = co_await listener.accept(); + if (stream) { + accepted.fetch_add(1, std::memory_order_relaxed); + } + } + co_return; +} + +static task tcp_connect_hostname_attempt( + std::string host, + uint16_t port, + std::atomic& connected, + std::atomic& failed, + std::atomic& first_error) { + resolve_options options; + options.use_cache = true; + + auto addresses = co_await resolve_all(host, port, options); + if (addresses.empty()) { + failed.fetch_add(1, std::memory_order_relaxed); + int err = errno ? errno : EHOSTUNREACH; + int expected = 0; + first_error.compare_exchange_strong(expected, err); + co_return; + } + + int last_error = 0; + for (const auto& addr : addresses) { + auto stream = co_await tcp_connect(addr); + if (stream) { + connected.fetch_add(1, std::memory_order_relaxed); + co_return; + } + last_error = errno; + } + + failed.fetch_add(1, std::memory_order_relaxed); + int err = last_error ? last_error : EHOSTUNREACH; + int expected = 0; + first_error.compare_exchange_strong(expected, err); + co_return; +} + +static task resolve_hostname_attempt( + std::string host, + uint16_t port, + std::optional& resolved, + std::atomic& done) { + resolved = co_await resolve_hostname(host, port); + done.store(true, std::memory_order_relaxed); + co_return; +} + +static task resolve_all_attempt_with_options( + std::string host, + uint16_t port, + resolve_options options, + std::vector& resolved, + std::atomic& done) { + resolved = co_await resolve_all(host, port, options); + done.store(true, std::memory_order_relaxed); + co_return; +} + TEST_CASE("ipv4_address basic operations", "[tcp][address][ipv4]") { SECTION("default constructor") { ipv4_address addr; @@ -1247,13 +1317,229 @@ TEST_CASE("TCP connect regression avoids double connect", "[tcp][connect][regres REQUIRE(failed == 0); } -TEST_CASE("socket_address with hostname resolution", "[tcp][address][dns]") { - // Test that socket_address can be constructed from "localhost" - // This tests the DNS resolution path - SECTION("localhost resolves") { - socket_address addr("localhost", 80); - // Should resolve to either IPv4 or IPv6 - REQUIRE((addr.is_v4() || addr.is_v6())); - REQUIRE(addr.port() == 80); +TEST_CASE("explicit hostname resolution", "[tcp][address][dns]") { + SECTION("localhost resolves asynchronously") { + scheduler sched(1); + sched.start(); + + std::optional resolved; + std::atomic done{false}; + + auto task = resolve_hostname_attempt("localhost", 80, resolved, done); + sched.spawn(task.release()); + + for (int i = 0; i < 200 && !done.load(std::memory_order_relaxed); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + sched.shutdown(); + + REQUIRE(done.load(std::memory_order_relaxed)); + REQUIRE(resolved.has_value()); + REQUIRE((resolved->is_v4() || resolved->is_v6())); + REQUIRE(resolved->port() == 80); + } +} + +TEST_CASE("tcp_connect hostname resolution uses cache", "[tcp][connect][dns][cache]") { + default_resolve_cache().clear(); + + auto listener = tcp_listener::bind(socket_address(0)); + REQUIRE(listener.has_value()); + + const uint16_t port = listener->local_address().port(); + REQUIRE(port > 0); + + std::atomic accepted{0}; + std::atomic connected{0}; + std::atomic failed{0}; + std::atomic first_error{0}; + + scheduler sched(2); + sched.start(); + + auto stats_before = default_resolve_cache().stats(); + + auto accept_task = accept_n_connections(*listener, 2, accepted); + sched.spawn(accept_task.release()); + + auto first_task = tcp_connect_hostname_attempt("localhost", port, connected, failed, first_error); + sched.spawn(first_task.release()); + + for (int i = 0; i < 300 && connected.load(std::memory_order_relaxed) < 1; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } + + auto stats_after_first = default_resolve_cache().stats(); + + auto second_task = tcp_connect_hostname_attempt("localhost", port, connected, failed, first_error); + sched.spawn(second_task.release()); + + for (int i = 0; i < 300 && (accepted.load(std::memory_order_relaxed) < 2 + || connected.load(std::memory_order_relaxed) < 2 + || failed.load(std::memory_order_relaxed) != 0); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + sched.shutdown(); + + auto stats_after_second = default_resolve_cache().stats(); + + INFO("connected=" << connected.load() << ", failed=" << failed.load() + << ", first errno=" << first_error.load()); + INFO("before stats: hits=" << stats_before.cache_hits + << ", misses=" << stats_before.cache_misses + << ", stores=" << stats_before.cache_stores + << ", invalidations=" << stats_before.cache_invalidations); + INFO("first stats: hits=" << stats_after_first.cache_hits + << ", misses=" << stats_after_first.cache_misses + << ", stores=" << stats_after_first.cache_stores + << ", invalidations=" << stats_after_first.cache_invalidations); + INFO("second stats: hits=" << stats_after_second.cache_hits + << ", misses=" << stats_after_second.cache_misses + << ", stores=" << stats_after_second.cache_stores + << ", invalidations=" << stats_after_second.cache_invalidations); + + REQUIRE(accepted == 2); + REQUIRE(connected == 2); + REQUIRE(failed == 0); + REQUIRE(stats_after_first.cache_misses >= (stats_before.cache_misses + 1)); + REQUIRE(stats_after_first.cache_stores >= (stats_before.cache_stores + 1)); + REQUIRE(stats_after_second.cache_hits >= (stats_after_first.cache_hits + 1)); + REQUIRE(stats_after_second.cache_misses == stats_after_first.cache_misses); +} + +TEST_CASE("resolve_options can disable cache", "[tcp][dns][cache][config]") { + default_resolve_cache().clear(); + auto stats_before = default_resolve_cache().stats(); + + resolve_options options; + options.use_cache = false; + + scheduler sched(1); + sched.start(); + + std::vector resolved_first; + std::vector resolved_second; + std::atomic done_first{false}; + std::atomic done_second{false}; + + auto first = resolve_all_attempt_with_options( + "localhost", 80, options, resolved_first, done_first); + sched.spawn(first.release()); + + for (int i = 0; i < 200 && !done_first.load(std::memory_order_relaxed); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto second = resolve_all_attempt_with_options( + "localhost", 80, options, resolved_second, done_second); + sched.spawn(second.release()); + + for (int i = 0; i < 200 && !done_second.load(std::memory_order_relaxed); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + sched.shutdown(); + + auto stats_after = default_resolve_cache().stats(); + REQUIRE(done_first.load(std::memory_order_relaxed)); + REQUIRE(done_second.load(std::memory_order_relaxed)); + REQUIRE_FALSE(resolved_first.empty()); + REQUIRE_FALSE(resolved_second.empty()); + REQUIRE(stats_after.cache_hits == stats_before.cache_hits); + REQUIRE(stats_after.cache_misses == stats_before.cache_misses); + REQUIRE(stats_after.cache_stores == stats_before.cache_stores); +} + +TEST_CASE("resolve_options can use custom cache instance", "[tcp][dns][cache][config]") { + default_resolve_cache().clear(); + auto default_before = default_resolve_cache().stats(); + + resolve_cache custom_cache; + resolve_options options; + options.use_cache = true; + options.cache = &custom_cache; + + scheduler sched(1); + sched.start(); + + std::vector resolved_first; + std::vector resolved_second; + std::atomic done_first{false}; + std::atomic done_second{false}; + + auto first = resolve_all_attempt_with_options( + "localhost", 80, options, resolved_first, done_first); + sched.spawn(first.release()); + + for (int i = 0; i < 200 && !done_first.load(std::memory_order_relaxed); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto second = resolve_all_attempt_with_options( + "localhost", 80, options, resolved_second, done_second); + sched.spawn(second.release()); + + for (int i = 0; i < 200 && !done_second.load(std::memory_order_relaxed); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + sched.shutdown(); + + auto custom_after = custom_cache.stats(); + auto default_after = default_resolve_cache().stats(); + + REQUIRE(done_first.load(std::memory_order_relaxed)); + REQUIRE(done_second.load(std::memory_order_relaxed)); + REQUIRE_FALSE(resolved_first.empty()); + REQUIRE_FALSE(resolved_second.empty()); + REQUIRE(custom_after.cache_misses >= 1); + REQUIRE(custom_after.cache_stores >= 1); + REQUIRE(custom_after.cache_hits >= 1); + REQUIRE(default_after.cache_hits == default_before.cache_hits); + REQUIRE(default_after.cache_misses == default_before.cache_misses); + REQUIRE(default_after.cache_stores == default_before.cache_stores); +} + +TEST_CASE("resolve_options ttl controls cache expiry", "[tcp][dns][cache][config]") { + resolve_cache cache; + resolve_options options; + options.use_cache = true; + options.cache = &cache; + options.positive_ttl = std::chrono::seconds(0); + + scheduler sched(1); + sched.start(); + + std::vector resolved_first; + std::vector resolved_second; + std::atomic done_first{false}; + std::atomic done_second{false}; + + auto first = resolve_all_attempt_with_options( + "localhost", 80, options, resolved_first, done_first); + sched.spawn(first.release()); + + for (int i = 0; i < 200 && !done_first.load(std::memory_order_relaxed); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto second = resolve_all_attempt_with_options( + "localhost", 80, options, resolved_second, done_second); + sched.spawn(second.release()); + + for (int i = 0; i < 200 && !done_second.load(std::memory_order_relaxed); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + sched.shutdown(); + + auto stats = cache.stats(); + REQUIRE(done_first.load(std::memory_order_relaxed)); + REQUIRE(done_second.load(std::memory_order_relaxed)); + REQUIRE_FALSE(resolved_first.empty()); + REQUIRE_FALSE(resolved_second.empty()); + REQUIRE(stats.cache_misses >= 2); + REQUIRE(stats.cache_hits == 0); } diff --git a/tools/elio-lldb.py b/tools/elio-lldb.py index 6724998..249acd8 100755 --- a/tools/elio-lldb.py +++ b/tools/elio-lldb.py @@ -6,7 +6,8 @@ It finds coroutine frames by traversing the scheduler's worker queues. Usage: - In LLDB: command script import /path/to/elio-lldb.py + Preferred: command script import /path/to/elio_lldb.py + (This file remains the implementation module.) Commands: elio list - List all vthreads from worker queues diff --git a/tools/elio_lldb.py b/tools/elio_lldb.py new file mode 100644 index 0000000..9625892 --- /dev/null +++ b/tools/elio_lldb.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +"""LLDB import entrypoint that loads the hyphen-named implementation. + +Use this file with: + command script import /path/to/tools/elio_lldb.py +""" + +import importlib.util +import pathlib + + +def _load_impl(): + script_path = pathlib.Path(__file__).with_name("elio-lldb.py") + spec = importlib.util.spec_from_file_location("elio_lldb_impl", script_path) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + + +_impl = _load_impl() + + +def __getattr__(name): + return getattr(_impl, name) + + +def __lldb_init_module(debugger, internal_dict): + return _impl.__lldb_init_module(debugger, internal_dict) diff --git a/wiki/Core-Concepts.md b/wiki/Core-Concepts.md index 11aba17..692469f 100644 --- a/wiki/Core-Concepts.md +++ b/wiki/Core-Concepts.md @@ -255,7 +255,7 @@ The overhead is minimal -- one pointer per coroutine frame, set during construct ### What it enables - **`elio-pstack`**: A CLI tool that attaches to a running process (or reads a coredump) and walks the virtual stack chains to print coroutine backtraces, similar to `pstack` for threads. -- **Debugger extensions**: `elio-gdb.py` and `elio-lldb.py` use the same frame linkage to implement `elio bt` (backtrace) and `elio list` (list active coroutines). +- **Debugger extensions**: `elio-gdb.py` and `elio_lldb.py` use the same frame linkage to implement `elio bt` (backtrace) and `elio list` (list active coroutines). - **Exception propagation**: When a coroutine throws, `unhandled_exception()` captures it in the promise. The parent coroutine can then rethrow the exception when it `co_await`s the child's result, propagating errors up the logical call chain. ### Frame metadata diff --git a/wiki/Debugging.md b/wiki/Debugging.md index 916799b..a822884 100644 --- a/wiki/Debugging.md +++ b/wiki/Debugging.md @@ -40,7 +40,7 @@ Every frame carries the following debug metadata with no additional allocation: | `parent_` | Pointer to the calling frame's promise, forming the virtual stack chain | | `frame_magic_` | Magic number for frame integrity validation | -The debugger tools (`elio-pstack`, `elio-gdb.py`, `elio-lldb.py`) use this metadata to present coroutine state in a format familiar to anyone who has used `pstack` or `thread apply all bt`. +The debugger tools (`elio-pstack`, `elio-gdb.py`, `elio_lldb.py`) use this metadata to present coroutine state in a format familiar to anyone who has used `pstack` or `thread apply all bt`. ## Tools @@ -48,7 +48,7 @@ The debugger tools (`elio-pstack`, `elio-gdb.py`, `elio-lldb.py`) use this metad |------|-------------| | `elio-pstack` | Command-line tool similar to `pstack` | | `elio-gdb.py` | GDB Python extension | -| `elio-lldb.py` | LLDB Python extension | +| `elio_lldb.py` | LLDB import entrypoint (loads `elio-lldb.py`) | ## elio-pstack @@ -177,13 +177,13 @@ Total tasks executed: 4780 ```bash # From LLDB command line -lldb -o 'command script import /path/to/tools/elio-lldb.py' ./myapp +lldb -o 'command script import /path/to/tools/elio_lldb.py' ./myapp # Or in LLDB session -(lldb) command script import /path/to/tools/elio-lldb.py +(lldb) command script import /path/to/tools/elio_lldb.py # Or add to ~/.lldbinit -command script import /path/to/tools/elio-lldb.py +command script import /path/to/tools/elio_lldb.py ``` ### Commands