From ff295d7f117eef590542499a128465f2a1b231b1 Mon Sep 17 00:00:00 2001 From: Pavel Grebnev Date: Wed, 24 Nov 2021 10:15:57 +0300 Subject: [PATCH 1/2] Added _emplace variants for enqueue methods + unit tests --- concurrentqueue.h | 81 ++++++++++++++++++++++++++-------- tests/unittests/unittests.cpp | 83 +++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 18 deletions(-) diff --git a/concurrentqueue.h b/concurrentqueue.h index 609ca4ab..a7740451 100644 --- a/concurrentqueue.h +++ b/concurrentqueue.h @@ -988,6 +988,18 @@ class ConcurrentQueue else return inner_enqueue(std::move(item)); } + // Enqueues a single item (by constructing it in-place from arguments). + // Allocates memory if required. Only fails if memory allocation fails (or implicit + // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0, + // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). + // Thread-safe. + template + inline bool enqueue_emplace(Args&&... args) + { + MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false; + else return inner_enqueue(std::forward(args)...); + } + // Enqueues a single item (by copying it) using an explicit producer token. // Allocates memory if required. Only fails if memory allocation fails (or // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). @@ -1006,6 +1018,17 @@ class ConcurrentQueue return inner_enqueue(token, std::move(item)); } + // Enqueues a single item (by constructing it in-place from arguments) using an explicit + // producer token. + // Allocates memory if required. Only fails if memory allocation fails (or + // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). + // Thread-safe. + template + inline bool enqueue_token_emplace(producer_token_t const& token, Args&&... args) + { + return inner_enqueue(token, std::forward(args)...); + } + // Enqueues several items. // Allocates memory if required. Only fails if memory allocation fails (or // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE @@ -1053,6 +1076,18 @@ class ConcurrentQueue else return inner_enqueue(std::move(item)); } + // Enqueues a single item (by constructing it in-place from arguments). + // Does not allocate memory (except for one-time implicit producer). + // Fails if not enough room to enqueue (or implicit production is + // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0). + // Thread-safe. + template + inline bool try_enqueue_emplace(Args&&... args) + { + MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false; + else return inner_enqueue(std::forward(args)...); + } + // Enqueues a single item (by copying it) using an explicit producer token. // Does not allocate memory. Fails if not enough room to enqueue. // Thread-safe. @@ -1069,6 +1104,16 @@ class ConcurrentQueue return inner_enqueue(token, std::move(item)); } + // Enqueues a single item (by constructing it in-place from arguments) using an explicit + // producer token. + // Does not allocate memory. Fails if not enough room to enqueue. + // Thread-safe. + template + inline bool try_enqueue_token_emplace(producer_token_t const& token, Args&&... args) + { + return inner_enqueue(token, std::forward(args)...); + } + // Enqueues several items. // Does not allocate memory (except for one-time implicit producer). // Fails if not enough room to enqueue (or implicit production is @@ -1342,17 +1387,17 @@ class ConcurrentQueue // Queue methods /////////////////////////////// - template - inline bool inner_enqueue(producer_token_t const& token, U&& element) + template + inline bool inner_enqueue(producer_token_t const& token, U&&... element) { - return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue(std::forward(element)); + return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue(std::forward(element)...); } - template - inline bool inner_enqueue(U&& element) + template + inline bool inner_enqueue(U&&... element) { auto producer = get_or_add_implicit_producer(); - return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue(std::forward(element)); + return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue(std::forward(element)...); } template @@ -1830,8 +1875,8 @@ class ConcurrentQueue } } - template - inline bool enqueue(U&& element) + template + inline bool enqueue(U&&... element) { index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); index_t newTailIndex = 1 + currentTailIndex; @@ -1897,11 +1942,11 @@ class ConcurrentQueue ++pr_blockIndexSlotsUsed; } - MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)))) { + MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)...))) { // The constructor may throw. We want the element not to appear in the queue in // that case (without corrupting the queue): MOODYCAMEL_TRY { - new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element)); + new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element)...); } MOODYCAMEL_CATCH (...) { // Revert change to the current block, but leave the new block available @@ -1923,14 +1968,14 @@ class ConcurrentQueue blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release); pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); - MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)))) { + MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)...))) { this->tailIndex.store(newTailIndex, std::memory_order_release); return true; } } // Enqueue - new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element)); + new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element)...); this->tailIndex.store(newTailIndex, std::memory_order_release); return true; @@ -2468,8 +2513,8 @@ class ConcurrentQueue } } - template - inline bool enqueue(U&& element) + template + inline bool enqueue(U&&... element) { index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); index_t newTailIndex = 1 + currentTailIndex; @@ -2501,10 +2546,10 @@ class ConcurrentQueue #endif newBlock->ConcurrentQueue::Block::template reset_empty(); - MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)))) { + MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)...))) { // May throw, try to insert now before we publish the fact that we have this new block MOODYCAMEL_TRY { - new ((*newBlock)[currentTailIndex]) T(std::forward(element)); + new ((*newBlock)[currentTailIndex]) T(std::forward(element)...); } MOODYCAMEL_CATCH (...) { rewind_block_index_tail(); @@ -2519,14 +2564,14 @@ class ConcurrentQueue this->tailBlock = newBlock; - MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)))) { + MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)...))) { this->tailIndex.store(newTailIndex, std::memory_order_release); return true; } } // Enqueue - new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element)); + new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element)...); this->tailIndex.store(newTailIndex, std::memory_order_release); return true; diff --git a/tests/unittests/unittests.cpp b/tests/unittests/unittests.cpp index 029f7428..c30d59ea 100644 --- a/tests/unittests/unittests.cpp +++ b/tests/unittests/unittests.cpp @@ -198,6 +198,29 @@ struct Moveable { #endif }; +struct Emplaceable { + Emplaceable() : moved(false), copied(false), copyData(0), moveData(0) {} + Emplaceable(const Copyable& copyData, Moveable&& moveData) : moved(false), copied(false), copyData(copyData), moveData(std::move(moveData)) { } + Emplaceable(Emplaceable&& o) MOODYCAMEL_NOEXCEPT : moved(true), copied(o.copied), copyData(o.copyData), moveData(std::move(o.moveData)) { } + void operator=(Emplaceable&& o) MOODYCAMEL_NOEXCEPT { moved = true; copied = o.copied; copyData = o.copyData; moveData = std::move(o.moveData); } + bool moved; + bool copied; + Copyable copyData; + Moveable moveData; + +#if defined(_MSC_VER) && _MSC_VER < 1800 + // VS2012's std::is_nothrow_[move_]constructible is broken, so the queue never attempts to + // move objects with that compiler. In this case, we don't know whether it's really a copy + // or not being done, so give the benefit of the doubt (given the tests pass on other platforms) + // and assume it would have done a move if it could have (don't set copied to true). + Emplaceable(Emplaceable const& o) MOODYCAMEL_NOEXCEPT : moved(o.moved), copied(o.copied), copyData(o.copyData), moveData(o.moveData) { } + void operator=(Emplaceable const& o) MOODYCAMEL_NOEXCEPT { moved = o.moved; copied = o.copied; copyData = o.copyData; moveData = o.moveData; } +#else + Emplaceable(Emplaceable const& o) MOODYCAMEL_NOEXCEPT : moved(o.moved), copied(true), copyData(o.copyData), moveData(o.moveData) { } + void operator=(Emplaceable const& o) MOODYCAMEL_NOEXCEPT { moved = o.moved; copied = true; copyData = o.copyData; moveData = o.moveData; } +#endif +}; + struct ThrowingMovable { static std::atomic& ctorCount() { static std::atomic c; return c; } static std::atomic& destroyCount() { static std::atomic c; return c; } @@ -3233,6 +3256,20 @@ class ConcurrentQueueTests : public TestClass ASSERT_OR_FAIL(!item.copied); ASSERT_OR_FAIL(!q.try_dequeue(item)); } + // enqueue_emplace(Args&&...) + { + ConcurrentQueue q; + ASSERT_OR_FAIL(q.enqueue_emplace(Copyable(1234), Moveable(12345))); + Emplaceable item; + ASSERT_OR_FAIL(q.try_dequeue(item)); + ASSERT_OR_FAIL(item.copyData.id == 1234); + ASSERT_OR_FAIL(item.moveData.id == 12345); + ASSERT_OR_FAIL(item.moved); + ASSERT_OR_FAIL(!item.copied); + ASSERT_OR_FAIL(item.moveData.moved); + ASSERT_OR_FAIL(!item.moveData.copied); + ASSERT_OR_FAIL(!q.try_dequeue(item)); + } { ConcurrentQueue q; Moveable original(12345); @@ -3279,6 +3316,21 @@ class ConcurrentQueueTests : public TestClass ASSERT_OR_FAIL(!item.copied); ASSERT_OR_FAIL(!q.try_dequeue(item)); } + // enqueue_emplace(Token, Args&&...) + { + ConcurrentQueue q; + ProducerToken t(q); + ASSERT_OR_FAIL(q.enqueue_token_emplace(t, Copyable(1234), Moveable(12345))); + Emplaceable item; + ASSERT_OR_FAIL(q.try_dequeue(item)); + ASSERT_OR_FAIL(item.copyData.id == 1234); + ASSERT_OR_FAIL(item.moveData.id == 12345); + ASSERT_OR_FAIL(item.moved); + ASSERT_OR_FAIL(!item.copied); + ASSERT_OR_FAIL(item.moveData.moved); + ASSERT_OR_FAIL(!item.moveData.copied); + ASSERT_OR_FAIL(!q.try_dequeue(item)); + } { ConcurrentQueue q; ProducerToken t(q); @@ -3346,6 +3398,21 @@ class ConcurrentQueueTests : public TestClass ASSERT_OR_FAIL(!q.try_dequeue(item)); } + // try_enqueue_emplace(Args&&...) + { + ConcurrentQueue q; + ASSERT_OR_FAIL(q.try_enqueue_emplace(Copyable(1234), Moveable(12345))); + Emplaceable item; + ASSERT_OR_FAIL(q.try_dequeue(item)); + ASSERT_OR_FAIL(item.copyData.id == 1234); + ASSERT_OR_FAIL(item.moveData.id == 12345); + ASSERT_OR_FAIL(item.moved); + ASSERT_OR_FAIL(!item.copied); + ASSERT_OR_FAIL(item.moveData.moved); + ASSERT_OR_FAIL(!item.moveData.copied); + ASSERT_OR_FAIL(!q.try_dequeue(item)); + } + // try_enqueue(Token, T const&) { ConcurrentQueue q; @@ -3394,6 +3461,22 @@ class ConcurrentQueueTests : public TestClass ASSERT_OR_FAIL(!q.try_dequeue(item)); } + // try_enqueue_emplace(Token, Args&&...) + { + ConcurrentQueue q; + ProducerToken t(q); + ASSERT_OR_FAIL(q.try_enqueue_token_emplace(t, Copyable(1234), Moveable(12345))); + Emplaceable item; + ASSERT_OR_FAIL(q.try_dequeue(item)); + ASSERT_OR_FAIL(item.copyData.id == 1234); + ASSERT_OR_FAIL(item.moveData.id == 12345); + ASSERT_OR_FAIL(item.moved); + ASSERT_OR_FAIL(!item.copied); + ASSERT_OR_FAIL(item.moveData.moved); + ASSERT_OR_FAIL(!item.moveData.copied); + ASSERT_OR_FAIL(!q.try_dequeue(item)); + } + // enqueue_bulk(It itemFirst, size_t count) { ConcurrentQueue q; From 76d23050fa88548b0d457d575d8667c04eb70ae8 Mon Sep 17 00:00:00 2001 From: Pavel Grebnev Date: Sun, 26 Dec 2021 23:26:11 +0100 Subject: [PATCH 2/2] Refactor: renamed emplaceable argument from 'element' to 'args' and the type from 'U' to 'Args' for internal methods to denote that these are constructor arguments --- concurrentqueue.h | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/concurrentqueue.h b/concurrentqueue.h index a7740451..ab54ce79 100644 --- a/concurrentqueue.h +++ b/concurrentqueue.h @@ -1387,17 +1387,17 @@ class ConcurrentQueue // Queue methods /////////////////////////////// - template - inline bool inner_enqueue(producer_token_t const& token, U&&... element) + template + inline bool inner_enqueue(producer_token_t const& token, Args&&... args) { - return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue(std::forward(element)...); + return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue(std::forward(args)...); } - template - inline bool inner_enqueue(U&&... element) + template + inline bool inner_enqueue(Args&&... args) { auto producer = get_or_add_implicit_producer(); - return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue(std::forward(element)...); + return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue(std::forward(args)...); } template @@ -1875,8 +1875,8 @@ class ConcurrentQueue } } - template - inline bool enqueue(U&&... element) + template + inline bool enqueue(Args&&... args) { index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); index_t newTailIndex = 1 + currentTailIndex; @@ -1942,11 +1942,11 @@ class ConcurrentQueue ++pr_blockIndexSlotsUsed; } - MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)...))) { + MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(args)...))) { // The constructor may throw. We want the element not to appear in the queue in // that case (without corrupting the queue): MOODYCAMEL_TRY { - new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element)...); + new ((*this->tailBlock)[currentTailIndex]) T(std::forward(args)...); } MOODYCAMEL_CATCH (...) { // Revert change to the current block, but leave the new block available @@ -1968,14 +1968,14 @@ class ConcurrentQueue blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release); pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); - MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)...))) { + MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(args)...))) { this->tailIndex.store(newTailIndex, std::memory_order_release); return true; } } // Enqueue - new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element)...); + new ((*this->tailBlock)[currentTailIndex]) T(std::forward(args)...); this->tailIndex.store(newTailIndex, std::memory_order_release); return true; @@ -2513,8 +2513,8 @@ class ConcurrentQueue } } - template - inline bool enqueue(U&&... element) + template + inline bool enqueue(Args&&... args) { index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); index_t newTailIndex = 1 + currentTailIndex; @@ -2546,10 +2546,10 @@ class ConcurrentQueue #endif newBlock->ConcurrentQueue::Block::template reset_empty(); - MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)...))) { + MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(args)...))) { // May throw, try to insert now before we publish the fact that we have this new block MOODYCAMEL_TRY { - new ((*newBlock)[currentTailIndex]) T(std::forward(element)...); + new ((*newBlock)[currentTailIndex]) T(std::forward(args)...); } MOODYCAMEL_CATCH (...) { rewind_block_index_tail(); @@ -2564,14 +2564,14 @@ class ConcurrentQueue this->tailBlock = newBlock; - MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(element)...))) { + MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast(nullptr)) T(std::forward(args)...))) { this->tailIndex.store(newTailIndex, std::memory_order_release); return true; } } // Enqueue - new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element)...); + new ((*this->tailBlock)[currentTailIndex]) T(std::forward(args)...); this->tailIndex.store(newTailIndex, std::memory_order_release); return true;