From fd88918a6bfb681f7c4598c995fec1b83d4d1513 Mon Sep 17 00:00:00 2001 From: chenBright Date: Thu, 10 Jul 2025 23:12:46 +0800 Subject: [PATCH] Support leaky ThreadId --- src/babylon/concurrent/BUILD | 1 + src/babylon/concurrent/id_allocator.h | 29 ++++++---- src/babylon/concurrent/id_allocator.hpp | 47 ++++++++++++----- src/babylon/concurrent/thread_local.h | 70 +++++++++++++------------ 4 files changed, 88 insertions(+), 59 deletions(-) diff --git a/src/babylon/concurrent/BUILD b/src/babylon/concurrent/BUILD index c6f25104..a67bc44e 100644 --- a/src/babylon/concurrent/BUILD +++ b/src/babylon/concurrent/BUILD @@ -98,6 +98,7 @@ cc_library( strip_include_prefix = '//src', deps = [ ':vector', + '//src/babylon:sanitizer_helper', ], ) diff --git a/src/babylon/concurrent/id_allocator.h b/src/babylon/concurrent/id_allocator.h index 0a5ebc85..d3f70905 100644 --- a/src/babylon/concurrent/id_allocator.h +++ b/src/babylon/concurrent/id_allocator.h @@ -89,10 +89,13 @@ class IdAllocator { // 获取当前所在线程的唯一标识 // 原理上syscall -// __NR_gettid/pthread_self/std::this_thread::get_id都可以获取到唯一标识 -// 但是ThreadId的实现基于IdAllocator,可以提供【尽量小】且【尽量连续】的编号能力 -class ThreadId { - public: +// __NR_gettid/pthread_self/std::this_thread::get_id都可以获取到唯一标识。 +// 但是ThreadId/LeakyThreadId的实现基于IdAllocator,可以提供【尽量小】且【尽量连续】的编号能力。 +// 在不析构的场景下,推荐使用`Leaky=true'的模式。 +namespace internal { +template +class ThreadIdImpl { +public: // 获取当前线程的标识 // 支持最多活跃65534个线程,对于合理的服务端程序设计已经足够 // T: 每个类型有自己的IdAllocator,在线程局部存储设计中可以用来隔离不同类型 @@ -116,18 +119,22 @@ class ThreadId { IsInvocable::value>::type> static void for_each(C&& callback); - private: +private: // 内部类型,用thread local持有,使用者不应尝试构造 - inline ThreadId(IdAllocator& allocator); - ThreadId(ThreadId&&) = delete; - ThreadId(const ThreadId&) = delete; - ThreadId& operator=(ThreadId&&) = delete; - ThreadId& operator=(const ThreadId&) = delete; - inline ~ThreadId() noexcept; + inline ThreadIdImpl(IdAllocator& allocator); + ThreadIdImpl(ThreadIdImpl&&) = delete; + ThreadIdImpl(const ThreadIdImpl&) = delete; + ThreadIdImpl& operator=(ThreadIdImpl&&) = delete; + ThreadIdImpl& operator=(const ThreadIdImpl&) = delete; + inline ~ThreadIdImpl() noexcept; IdAllocator& _allocator; VersionedValue _value; }; +} // namespace internal + +class ThreadId : public internal::ThreadIdImpl {}; +class LeakyThreadId : public internal::ThreadIdImpl {}; BABYLON_NAMESPACE_END diff --git a/src/babylon/concurrent/id_allocator.hpp b/src/babylon/concurrent/id_allocator.hpp index 6ab8360e..8449ccf4 100644 --- a/src/babylon/concurrent/id_allocator.hpp +++ b/src/babylon/concurrent/id_allocator.hpp @@ -1,6 +1,7 @@ #pragma once #include "babylon/concurrent/id_allocator.h" +#include "babylon/sanitizer_helper.h" // BABYLON_LEAK_CHECK_DISABLER // clang-format off #include BABYLON_EXTERNAL(absl/base/optimization.h) // ABSL_PREDICT_FALSE @@ -138,9 +139,22 @@ inline ::std::basic_ostream& operator<<( namespace internal { namespace concurrent_id_allocator { -template +template struct IdAllocatorFotType { - static IdAllocator& instance() noexcept { + template + static typename ::std::enable_if&>::type + instance() noexcept { +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpragmas" +#pragma GCC diagnostic ignored "-Wunknown-warning-option" + BABYLON_LEAK_CHECK_DISABLER; + static auto object = new IdAllocator(); +#pragma GCC diagnostic pop + return *object; + } + template + static typename ::std::enable_if&>::type + instance() noexcept { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpragmas" #pragma GCC diagnostic ignored "-Wunknown-warning-option" @@ -151,8 +165,8 @@ struct IdAllocatorFotType { } }; } // namespace concurrent_id_allocator -} // namespace internal +template template // The identity of thread_local within inline function has some bug as mentioned // in https://gcc.gnu.org/bugzilla/show_bug.cgi?id=85400 @@ -164,35 +178,40 @@ ABSL_ATTRIBUTE_NOINLINE inline ABSL_ATTRIBUTE_ALWAYS_INLINE #endif // __clang__ || BABYLON_GCC_VERSION >= 80400 // clang-format off -VersionedValue ThreadId::current_thread_id() noexcept { +VersionedValue ThreadIdImpl::current_thread_id() noexcept { // clang-format on #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpragmas" #pragma GCC diagnostic ignored "-Wunknown-warning-option" #pragma GCC diagnostic ignored "-Wexit-time-destructors" - thread_local ThreadId id( - internal::concurrent_id_allocator::IdAllocatorFotType::instance()); + thread_local ThreadIdImpl id( + concurrent_id_allocator::IdAllocatorFotType::instance()); #pragma GCC diagnostic pop return id._value; } +template template -inline uint16_t ThreadId::end() noexcept { - return internal::concurrent_id_allocator::IdAllocatorFotType::instance() - .end(); +inline uint16_t ThreadIdImpl::end() noexcept { + return concurrent_id_allocator::IdAllocatorFotType:: + instance().end(); } +template template -ABSL_ATTRIBUTE_NOINLINE void ThreadId::for_each(C&& callback) { - internal::concurrent_id_allocator::IdAllocatorFotType::instance().for_each( - ::std::forward(callback)); +ABSL_ATTRIBUTE_NOINLINE void ThreadIdImpl::for_each(C&& callback) { + concurrent_id_allocator::IdAllocatorFotType::instance(). + for_each(::std::forward(callback)); } -inline ThreadId::ThreadId(IdAllocator& allocator) +template +inline ThreadIdImpl::ThreadIdImpl(IdAllocator& allocator) : _allocator(allocator), _value(allocator.allocate()) {} -inline ThreadId::~ThreadId() noexcept { +template +inline ThreadIdImpl::~ThreadIdImpl() noexcept { _allocator.deallocate(_value); } +} // namespace internal BABYLON_NAMESPACE_END diff --git a/src/babylon/concurrent/thread_local.h b/src/babylon/concurrent/thread_local.h index 99f9bfaa..5d064681 100644 --- a/src/babylon/concurrent/thread_local.h +++ b/src/babylon/concurrent/thread_local.h @@ -1,7 +1,7 @@ #pragma once #include "babylon/sanitizer_helper.h" // BABYLON_LEAK_CHECK_DISABLER -#include "babylon/concurrent/id_allocator.h" // IdAllocator, ThreadId +#include "babylon/concurrent/id_allocator.h" // IdAllocator, ThreadId, LeakyThreadId #include // 等baidu/adu-lab/energon-onboard升级完成后去掉 @@ -10,8 +10,9 @@ BABYLON_NAMESPACE_BEGIN // 提供了遍历功能的线程局部存储 // 在提供接近thread_local的访问性能同时,支持遍历这些线程局部存储 // 另一点和thread_local的区别是不再要求是static的,所以可以支持动态多个 -template +template class EnumerableThreadLocal { + using ThreadIdType = typename std::conditional::type; public: // 可默认构造,可以移动,不能拷贝 EnumerableThreadLocal() noexcept; @@ -45,7 +46,8 @@ class EnumerableThreadLocal { auto snapshot = _storage.snapshot(); snapshot.for_each( 0, - ::std::min(ThreadId::end(), static_cast(snapshot.size())), + ::std::min(ThreadIdType::template end(), + static_cast(snapshot.size())), callback); } @@ -55,7 +57,8 @@ class EnumerableThreadLocal { auto snapshot = _storage.snapshot(); snapshot.for_each( 0, - ::std::min(ThreadId::end(), static_cast(snapshot.size())), + ::std::min(ThreadIdType::template end(), + static_cast(snapshot.size())), callback); } @@ -64,7 +67,7 @@ class EnumerableThreadLocal { IsInvocable::value>::type> inline void for_each_alive(C&& callback) { auto snapshot = _storage.snapshot(); - ThreadId::for_each([&](uint16_t begin, uint16_t end) { + ThreadIdType::template for_each([&](uint16_t begin, uint16_t end) { snapshot.for_each(begin, end, callback); }); } @@ -74,7 +77,7 @@ class EnumerableThreadLocal { inline void for_each_alive(C&& callback) const { auto snapshot = _storage.snapshot(); uint16_t size = snapshot.size(); - ThreadId::for_each([&](uint16_t begin, uint16_t end) { + ThreadIdType::template for_each([&](uint16_t begin, uint16_t end) { begin = ::std::min(begin, size); end = ::std::min(end, size); snapshot.for_each(begin, end, callback); @@ -92,8 +95,8 @@ class EnumerableThreadLocal { size_t _id {fetch_add_id()}; }; -template -struct EnumerableThreadLocal::Cache { +template +struct EnumerableThreadLocal::Cache { size_t id {0}; T* item {nullptr}; }; @@ -199,7 +202,6 @@ class CompactEnumerableThreadLocal { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpragmas" #pragma GCC diagnostic ignored "-Wunknown-warning-option" -#pragma GCC diagnostic ignored "-Wexit-time-destructors" BABYLON_LEAK_CHECK_DISABLER; static auto allocator = new IdAllocator(); #pragma GCC diagnostic pop @@ -223,7 +225,6 @@ class CompactEnumerableThreadLocal { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpragmas" #pragma GCC diagnostic ignored "-Wunknown-warning-option" -#pragma GCC diagnostic ignored "-Wexit-time-destructors" BABYLON_LEAK_CHECK_DISABLER; static auto storage_vector = new StorageVector(); #pragma GCC diagnostic pop @@ -248,71 +249,72 @@ class CompactEnumerableThreadLocal { //////////////////////////////////////////////////////////////////////////////// // EnumerableThreadLocal begin -template -inline EnumerableThreadLocal::EnumerableThreadLocal() noexcept { - // 确保内部使用的ThreadId此时初始化,建立正确的析构顺序 - ThreadId::end(); +template +inline EnumerableThreadLocal::EnumerableThreadLocal() noexcept { + // 确保内部使用的ThreadIdType此时初始化,建立正确的析构顺序 + ThreadIdType::template end(); } -template -EnumerableThreadLocal::EnumerableThreadLocal( +template +EnumerableThreadLocal::EnumerableThreadLocal( EnumerableThreadLocal&& other) noexcept : EnumerableThreadLocal {} { *this = ::std::move(other); } -template -EnumerableThreadLocal& EnumerableThreadLocal::operator=( +template +EnumerableThreadLocal& EnumerableThreadLocal::operator=( EnumerableThreadLocal&& other) noexcept { ::std::swap(_id, other._id); ::std::swap(_storage, other._storage); return *this; } -template +template template -inline EnumerableThreadLocal::EnumerableThreadLocal(C&& constructor) noexcept +inline EnumerableThreadLocal::EnumerableThreadLocal( + C&& constructor) noexcept : _storage {::std::forward(constructor)} { - // 确保内部使用的ThreadId此时初始化,建立正确的析构顺序 - ThreadId::end(); + // 确保内部使用的ThreadIdType此时初始化,建立正确的析构顺序 + ThreadIdType::template end(); } -template -ABSL_ATTRIBUTE_NOINLINE void EnumerableThreadLocal::set_constructor( +template +ABSL_ATTRIBUTE_NOINLINE void EnumerableThreadLocal::set_constructor( ::std::function constructor) noexcept { _storage.set_constructor(constructor); } -template -inline T& EnumerableThreadLocal::local() noexcept { +template +inline T& EnumerableThreadLocal::local() noexcept { auto item = local_fast(); if (ABSL_PREDICT_FALSE(item == nullptr)) { - item = &_storage.ensure(ThreadId::current_thread_id().value); + item = &_storage.ensure(ThreadIdType::template current_thread_id().value); _s_cache.id = _id; _s_cache.item = item; } return *item; } -template +template ABSL_ATTRIBUTE_ALWAYS_INLINE inline T* -EnumerableThreadLocal::local_fast() noexcept { +EnumerableThreadLocal::local_fast() noexcept { if (ABSL_PREDICT_TRUE(_s_cache.id == _id)) { return _s_cache.item; } return nullptr; } -template +template ABSL_ATTRIBUTE_NOINLINE size_t -EnumerableThreadLocal::fetch_add_id() noexcept { +EnumerableThreadLocal::fetch_add_id() noexcept { static ::std::atomic next_id {1}; return next_id.fetch_add(1, ::std::memory_order_relaxed); } -template -thread_local - typename EnumerableThreadLocal::Cache EnumerableThreadLocal::_s_cache; +template +thread_local typename EnumerableThreadLocal::Cache +EnumerableThreadLocal::_s_cache; // EnumerableThreadLocal end ////////////////////////////////////////////////////////////////////////////////