Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/babylon/concurrent/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ cc_library(
strip_include_prefix = '//src',
deps = [
':vector',
'//src/babylon:sanitizer_helper',
],
)

Expand Down
29 changes: 18 additions & 11 deletions src/babylon/concurrent/id_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ class IdAllocator {

// 获取当前所在线程的唯一标识
// 原理上syscall
// __NR_gettid/pthread_self/std::this_thread::get_id都可以获取到唯一标识
// 但是ThreadId的实现基于IdAllocator,可以提供【尽量小】且【尽量连续】的编号能力
class ThreadId {
public:
// __NR_gettid/pthread_self/std::this_thread::get_id都可以获取到唯一标识。
// 但是ThreadId/LeakyThreadId的实现基于IdAllocator,可以提供【尽量小】且【尽量连续】的编号能力。
// 在不析构的场景下,推荐使用`Leaky=true'的模式。
namespace internal {
template <bool Leaky>
class ThreadIdImpl {
public:
// 获取当前线程的标识
// 支持最多活跃65534个线程,对于合理的服务端程序设计已经足够
// T: 每个类型有自己的IdAllocator,在线程局部存储设计中可以用来隔离不同类型
Expand All @@ -116,18 +119,22 @@ class ThreadId {
IsInvocable<C, uint16_t, uint16_t>::value>::type>
static void for_each(C&& callback);

private:
private:
// 内部类型,用thread local持有,使用者不应尝试构造
inline ThreadId(IdAllocator<uint16_t>& allocator);
ThreadId(ThreadId&&) = delete;
ThreadId(const ThreadId&) = delete;
ThreadId& operator=(ThreadId&&) = delete;
ThreadId& operator=(const ThreadId&) = delete;
inline ~ThreadId() noexcept;
inline ThreadIdImpl(IdAllocator<uint16_t>& allocator);
ThreadIdImpl(ThreadIdImpl&&) = delete;
ThreadIdImpl(const ThreadIdImpl&) = delete;
ThreadIdImpl& operator=(ThreadIdImpl&&) = delete;
ThreadIdImpl& operator=(const ThreadIdImpl&) = delete;
inline ~ThreadIdImpl() noexcept;

IdAllocator<uint16_t>& _allocator;
VersionedValue<uint16_t> _value;
};
} // namespace internal

class ThreadId : public internal::ThreadIdImpl<false> {};
class LeakyThreadId : public internal::ThreadIdImpl<true> {};

BABYLON_NAMESPACE_END

Expand Down
47 changes: 33 additions & 14 deletions src/babylon/concurrent/id_allocator.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "babylon/concurrent/id_allocator.h"
#include "babylon/sanitizer_helper.h" // BABYLON_LEAK_CHECK_DISABLER

// clang-format off
#include BABYLON_EXTERNAL(absl/base/optimization.h) // ABSL_PREDICT_FALSE
Expand Down Expand Up @@ -138,9 +139,22 @@ inline ::std::basic_ostream<C, T>& operator<<(

namespace internal {
namespace concurrent_id_allocator {
template <typename T>
template <typename T, bool Leaky = false>
struct IdAllocatorFotType {
static IdAllocator<uint16_t>& instance() noexcept {
template <bool L = Leaky>
static typename ::std::enable_if<L, IdAllocator<uint16_t>&>::type
instance() noexcept {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
BABYLON_LEAK_CHECK_DISABLER;
static auto object = new IdAllocator<uint32_t>();
#pragma GCC diagnostic pop
return *object;
}
template <bool L = Leaky>
static typename ::std::enable_if<!L, IdAllocator<uint16_t>&>::type
instance() noexcept {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
Expand All @@ -151,8 +165,8 @@ struct IdAllocatorFotType {
}
};
} // namespace concurrent_id_allocator
} // namespace internal

template <bool Leaky>
template <typename T>
// The identity of thread_local within inline function has some bug as mentioned
// in https://gcc.gnu.org/bugzilla/show_bug.cgi?id=85400
Expand All @@ -164,35 +178,40 @@ ABSL_ATTRIBUTE_NOINLINE
inline ABSL_ATTRIBUTE_ALWAYS_INLINE
#endif // __clang__ || BABYLON_GCC_VERSION >= 80400
// clang-format off
VersionedValue<uint16_t> ThreadId::current_thread_id() noexcept {
VersionedValue<uint16_t> ThreadIdImpl<Leaky>::current_thread_id() noexcept {
// clang-format on
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
#pragma GCC diagnostic ignored "-Wexit-time-destructors"
thread_local ThreadId id(
internal::concurrent_id_allocator::IdAllocatorFotType<T>::instance());
thread_local ThreadIdImpl id(
concurrent_id_allocator::IdAllocatorFotType<T, Leaky>::instance());
#pragma GCC diagnostic pop
return id._value;
}

template <bool Leaky>
template <typename T>
inline uint16_t ThreadId::end() noexcept {
return internal::concurrent_id_allocator::IdAllocatorFotType<T>::instance()
.end();
inline uint16_t ThreadIdImpl<Leaky>::end() noexcept {
return concurrent_id_allocator::IdAllocatorFotType<T, Leaky>::
instance().end();
}

template <bool Leaky>
template <typename T, typename C, typename>
ABSL_ATTRIBUTE_NOINLINE void ThreadId::for_each(C&& callback) {
internal::concurrent_id_allocator::IdAllocatorFotType<T>::instance().for_each(
::std::forward<C>(callback));
ABSL_ATTRIBUTE_NOINLINE void ThreadIdImpl<Leaky>::for_each(C&& callback) {
concurrent_id_allocator::IdAllocatorFotType<T, Leaky>::instance().
for_each(::std::forward<C>(callback));
}

inline ThreadId::ThreadId(IdAllocator<uint16_t>& allocator)
template <bool Leaky>
inline ThreadIdImpl<Leaky>::ThreadIdImpl(IdAllocator<uint16_t>& allocator)
: _allocator(allocator), _value(allocator.allocate()) {}

inline ThreadId::~ThreadId() noexcept {
template <bool Leaky>
inline ThreadIdImpl<Leaky>::~ThreadIdImpl() noexcept {
_allocator.deallocate(_value);
}
} // namespace internal

BABYLON_NAMESPACE_END
70 changes: 36 additions & 34 deletions src/babylon/concurrent/thread_local.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "babylon/sanitizer_helper.h" // BABYLON_LEAK_CHECK_DISABLER
#include "babylon/concurrent/id_allocator.h" // IdAllocator, ThreadId
#include "babylon/concurrent/id_allocator.h" // IdAllocator, ThreadId, LeakyThreadId

#include <math.h> // 等baidu/adu-lab/energon-onboard升级完成后去掉

Expand All @@ -10,8 +10,9 @@ BABYLON_NAMESPACE_BEGIN
// 提供了遍历功能的线程局部存储
// 在提供接近thread_local的访问性能同时,支持遍历这些线程局部存储
// 另一点和thread_local的区别是不再要求是static的,所以可以支持动态多个
template <typename T>
template <typename T, bool Leaky = false>
class EnumerableThreadLocal {
using ThreadIdType = typename std::conditional<Leaky, LeakyThreadId, ThreadId>::type;
public:
// 可默认构造,可以移动,不能拷贝
EnumerableThreadLocal() noexcept;
Expand Down Expand Up @@ -45,7 +46,8 @@ class EnumerableThreadLocal {
auto snapshot = _storage.snapshot();
snapshot.for_each(
0,
::std::min(ThreadId::end<T>(), static_cast<uint16_t>(snapshot.size())),
::std::min(ThreadIdType::template end<T>(),
static_cast<uint16_t>(snapshot.size())),
callback);
}

Expand All @@ -55,7 +57,8 @@ class EnumerableThreadLocal {
auto snapshot = _storage.snapshot();
snapshot.for_each(
0,
::std::min(ThreadId::end<T>(), static_cast<uint16_t>(snapshot.size())),
::std::min(ThreadIdType::template end<T>(),
static_cast<uint16_t>(snapshot.size())),
callback);
}

Expand All @@ -64,7 +67,7 @@ class EnumerableThreadLocal {
IsInvocable<C, T*, T*>::value>::type>
inline void for_each_alive(C&& callback) {
auto snapshot = _storage.snapshot();
ThreadId::for_each<T>([&](uint16_t begin, uint16_t end) {
ThreadIdType::template for_each<T>([&](uint16_t begin, uint16_t end) {
snapshot.for_each(begin, end, callback);
});
}
Expand All @@ -74,7 +77,7 @@ class EnumerableThreadLocal {
inline void for_each_alive(C&& callback) const {
auto snapshot = _storage.snapshot();
uint16_t size = snapshot.size();
ThreadId::for_each<T>([&](uint16_t begin, uint16_t end) {
ThreadIdType::template for_each<T>([&](uint16_t begin, uint16_t end) {
begin = ::std::min(begin, size);
end = ::std::min(end, size);
snapshot.for_each(begin, end, callback);
Expand All @@ -92,8 +95,8 @@ class EnumerableThreadLocal {
size_t _id {fetch_add_id()};
};

template <typename T>
struct EnumerableThreadLocal<T>::Cache {
template <typename T, bool Leaky>
struct EnumerableThreadLocal<T, Leaky>::Cache {
size_t id {0};
T* item {nullptr};
};
Expand Down Expand Up @@ -199,7 +202,6 @@ class CompactEnumerableThreadLocal {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
#pragma GCC diagnostic ignored "-Wexit-time-destructors"
BABYLON_LEAK_CHECK_DISABLER;
static auto allocator = new IdAllocator<uint32_t>();
#pragma GCC diagnostic pop
Expand All @@ -223,7 +225,6 @@ class CompactEnumerableThreadLocal {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
#pragma GCC diagnostic ignored "-Wexit-time-destructors"
BABYLON_LEAK_CHECK_DISABLER;
static auto storage_vector = new StorageVector();
#pragma GCC diagnostic pop
Expand All @@ -248,71 +249,72 @@ class CompactEnumerableThreadLocal {

////////////////////////////////////////////////////////////////////////////////
// EnumerableThreadLocal begin
template <typename T>
inline EnumerableThreadLocal<T>::EnumerableThreadLocal() noexcept {
// 确保内部使用的ThreadId此时初始化,建立正确的析构顺序
ThreadId::end<T>();
template <typename T, bool Leaky>
inline EnumerableThreadLocal<T, Leaky>::EnumerableThreadLocal() noexcept {
// 确保内部使用的ThreadIdType此时初始化,建立正确的析构顺序
ThreadIdType::template end<T>();
}

template <typename T>
EnumerableThreadLocal<T>::EnumerableThreadLocal(
template <typename T, bool Leaky>
EnumerableThreadLocal<T, Leaky>::EnumerableThreadLocal(
EnumerableThreadLocal&& other) noexcept
: EnumerableThreadLocal {} {
*this = ::std::move(other);
}

template <typename T>
EnumerableThreadLocal<T>& EnumerableThreadLocal<T>::operator=(
template <typename T, bool Leaky>
EnumerableThreadLocal<T, Leaky>& EnumerableThreadLocal<T, Leaky>::operator=(
EnumerableThreadLocal&& other) noexcept {
::std::swap(_id, other._id);
::std::swap(_storage, other._storage);
return *this;
}

template <typename T>
template <typename T, bool Leaky>
template <typename C>
inline EnumerableThreadLocal<T>::EnumerableThreadLocal(C&& constructor) noexcept
inline EnumerableThreadLocal<T, Leaky>::EnumerableThreadLocal(
C&& constructor) noexcept
: _storage {::std::forward<C>(constructor)} {
// 确保内部使用的ThreadId此时初始化,建立正确的析构顺序
ThreadId::end<T>();
// 确保内部使用的ThreadIdType此时初始化,建立正确的析构顺序
ThreadIdType::template end<T>();
}

template <typename T>
ABSL_ATTRIBUTE_NOINLINE void EnumerableThreadLocal<T>::set_constructor(
template <typename T, bool Leaky>
ABSL_ATTRIBUTE_NOINLINE void EnumerableThreadLocal<T, Leaky>::set_constructor(
::std::function<void(T*)> constructor) noexcept {
_storage.set_constructor(constructor);
}

template <typename T>
inline T& EnumerableThreadLocal<T>::local() noexcept {
template <typename T, bool Leaky>
inline T& EnumerableThreadLocal<T, Leaky>::local() noexcept {
auto item = local_fast();
if (ABSL_PREDICT_FALSE(item == nullptr)) {
item = &_storage.ensure(ThreadId::current_thread_id<T>().value);
item = &_storage.ensure(ThreadIdType::template current_thread_id<T>().value);
_s_cache.id = _id;
_s_cache.item = item;
}
return *item;
}

template <typename T>
template <typename T, bool Leaky>
ABSL_ATTRIBUTE_ALWAYS_INLINE inline T*
EnumerableThreadLocal<T>::local_fast() noexcept {
EnumerableThreadLocal<T, Leaky>::local_fast() noexcept {
if (ABSL_PREDICT_TRUE(_s_cache.id == _id)) {
return _s_cache.item;
}
return nullptr;
}

template <typename T>
template <typename T, bool Leaky>
ABSL_ATTRIBUTE_NOINLINE size_t
EnumerableThreadLocal<T>::fetch_add_id() noexcept {
EnumerableThreadLocal<T, Leaky>::fetch_add_id() noexcept {
static ::std::atomic<size_t> next_id {1};
return next_id.fetch_add(1, ::std::memory_order_relaxed);
}

template <typename T>
thread_local
typename EnumerableThreadLocal<T>::Cache EnumerableThreadLocal<T>::_s_cache;
template <typename T, bool Leaky>
thread_local typename EnumerableThreadLocal<T, Leaky>::Cache
EnumerableThreadLocal<T, Leaky>::_s_cache;
// EnumerableThreadLocal end
////////////////////////////////////////////////////////////////////////////////

Expand Down
Loading