diff --git a/hphp/runtime/ext/asio/asio-blockable.cpp b/hphp/runtime/ext/asio/asio-blockable.cpp index ad25c9ecec161..7c885773889f5 100644 --- a/hphp/runtime/ext/asio/asio-blockable.cpp +++ b/hphp/runtime/ext/asio/asio-blockable.cpp @@ -166,29 +166,35 @@ c_WaitableWaitHandle* AsioBlockable::getWaitHandle() const { } void AsioBlockableChain::unblock() { - while (auto cur = m_lastParent) { - m_lastParent = cur->getPrevParent(); - cur->updatePrevParent(nullptr); - // the onUnblocked handler may free cur - switch (cur->getKind()) { - case Kind::AsyncFunctionWaitHandleNode: - getAsyncFunctionWaitHandleNode(cur)->onUnblocked(); - break; - case Kind::AsyncGeneratorWaitHandle: - getAsyncGeneratorWaitHandle(cur)->onUnblocked(); - break; - case Kind::AwaitAllWaitHandleNode: - getAwaitAllWaitHandleNode(cur)->onUnblocked(); - break; - case Kind::ConcurrentWaitHandleNode: - getConcurrentWaitHandleNode(cur)->onUnblocked(); - break; - case Kind::ConditionWaitHandle: - getConditionWaitHandle(cur)->onUnblocked(); - break; - case Kind::PriorityBridgeWaitHandle: - getPriorityBridgeWaitHandle(cur)->onUnblocked(); - break; + std::vector worklist = { *this }; + while (!worklist.empty()) { + auto const lastParent = worklist.back().m_lastParent; + worklist.pop_back(); + + for (AsioBlockable* cur = lastParent, *next; cur; cur = next) { + next = cur->getPrevParent(); + cur->updatePrevParent(nullptr); + // the onUnblocked handler may free cur + switch (cur->getKind()) { + case Kind::AsyncFunctionWaitHandleNode: + getAsyncFunctionWaitHandleNode(cur)->onUnblocked(); + break; + case Kind::AsyncGeneratorWaitHandle: + getAsyncGeneratorWaitHandle(cur)->onUnblocked(); + break; + case Kind::AwaitAllWaitHandleNode: + getAwaitAllWaitHandleNode(cur)->onUnblocked(worklist); + break; + case Kind::ConcurrentWaitHandleNode: + getConcurrentWaitHandleNode(cur)->onUnblocked(worklist); + break; + case Kind::ConditionWaitHandle: + getConditionWaitHandle(cur)->onUnblocked(worklist); + break; + case Kind::PriorityBridgeWaitHandle: + getPriorityBridgeWaitHandle(cur)->onUnblocked(worklist); + break; + } } } } diff --git a/hphp/runtime/ext/asio/ext_await-all-wait-handle.cpp b/hphp/runtime/ext/asio/ext_await-all-wait-handle.cpp index 0de3267122e40..c3534d4a157c6 100644 --- a/hphp/runtime/ext/asio/ext_await-all-wait-handle.cpp +++ b/hphp/runtime/ext/asio/ext_await-all-wait-handle.cpp @@ -149,7 +149,7 @@ void c_AwaitAllWaitHandle::initialize(ContextStateIndex ctxStateIdx) { } } -void c_AwaitAllWaitHandle::onUnblocked(uint32_t idx) { +void c_AwaitAllWaitHandle::onUnblocked(uint32_t idx, std::vector& worklist) { assertx(idx <= m_unfinished); assertx(getState() == STATE_BLOCKED); @@ -166,25 +166,25 @@ void c_AwaitAllWaitHandle::onUnblocked(uint32_t idx) { } catch (const Object& cycle_exception) { assertx(cycle_exception->instanceof(SystemLib::getThrowableClass())); throwable_recompute_backtrace_from_wh(cycle_exception.get(), this); - markAsFailed(cycle_exception); + markAsFailed(cycle_exception, worklist); } return; } } // All children finished. - markAsFinished(); + markAsFinished(worklist); } } -void c_AwaitAllWaitHandle::markAsFinished() { +void c_AwaitAllWaitHandle::markAsFinished(std::vector& worklist) { auto parentChain = getParentChain(); setState(STATE_SUCCEEDED); tvWriteNull(m_resultOrException); - parentChain.unblock(); + worklist.emplace_back(std::move(parentChain)); decRefObj(this); } -void c_AwaitAllWaitHandle::markAsFailed(const Object& exception) { +void c_AwaitAllWaitHandle::markAsFailed(const Object& exception, std::vector& worklist) { for (uint32_t idx = 0; idx < m_cap; idx++) { auto const child = m_children[idx].m_child; if (!child->isFinished()) { @@ -196,7 +196,7 @@ void c_AwaitAllWaitHandle::markAsFailed(const Object& exception) { auto parentChain = getParentChain(); setState(STATE_FAILED); tvWriteObject(exception.get(), &m_resultOrException); - parentChain.unblock(); + worklist.emplace_back(std::move(parentChain)); decRefObj(this); } diff --git a/hphp/runtime/ext/asio/ext_await-all-wait-handle.h b/hphp/runtime/ext/asio/ext_await-all-wait-handle.h index 740b7fb387daf..43037888fffad 100644 --- a/hphp/runtime/ext/asio/ext_await-all-wait-handle.h +++ b/hphp/runtime/ext/asio/ext_await-all-wait-handle.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "hphp/runtime/base/type-array.h" #include "hphp/runtime/base/type-object.h" #include "hphp/runtime/ext/asio/ext_waitable-wait-handle.h" @@ -37,9 +39,28 @@ struct c_AwaitAllWaitHandle final : c_WaitableWaitHandle, static void instanceDtor(ObjectData* obj, const Class*) { auto wh = wait_handle(obj); - auto const sz = wh->heapSize(); - wh->~c_AwaitAllWaitHandle(); - tl_heap->objFree(obj, sz); + + std::vector queue = {wh}; + for (std::size_t i = 0; i < queue.size(); i++) { + auto cur = queue[i]; + for (int32_t j = 0; j < cur->m_cap; j++) { + auto cur_child = cur->m_children[j].m_child; + assertx(isFailed() || cur_child->isFinished()); + + if (cur_child->getKind() == Kind::AwaitAll) { + if (cur_child->decReleaseCheck()) { + queue.push_back(cur_child->asAwaitAll()); + } + } else { + decRefObj(cur_child); + } + } + } + + for (auto& cur : queue) { + auto const sz = cur->heapSize(); + tl_heap->objFree(cur, sz); + } } explicit c_AwaitAllWaitHandle(unsigned cap = 0) @@ -79,8 +100,8 @@ struct c_AwaitAllWaitHandle final : c_WaitableWaitHandle, return getChildIdx() == getWaitHandle()->m_unfinished; } - void onUnblocked() { - getWaitHandle()->onUnblocked(getChildIdx()); + void onUnblocked(std::vector& worklist) { + getWaitHandle()->onUnblocked(getChildIdx(), worklist); } AsioBlockable m_blockable; @@ -93,7 +114,7 @@ struct c_AwaitAllWaitHandle final : c_WaitableWaitHandle, } String getName(); - void onUnblocked(uint32_t idx); + void onUnblocked(uint32_t idx, std::vector& worklist); c_WaitableWaitHandle* getChild(); template void forEachChild(T fn); @@ -108,8 +129,8 @@ struct c_AwaitAllWaitHandle final : c_WaitableWaitHandle, static Object Create(Iter iter); static req::ptr Alloc(int32_t cnt); void initialize(ContextStateIndex ctxStateIdx); - void markAsFinished(void); - void markAsFailed(const Object& exception); + void markAsFinished(std::vector& worklist); + void markAsFailed(const Object& exception, std::vector& worklist); void setState(uint8_t state) { setKindState(Kind::AwaitAll, state); } // Construct an AAWH from an array-like without making layout assumptions. diff --git a/hphp/runtime/ext/asio/ext_concurrent-wait-handle.cpp b/hphp/runtime/ext/asio/ext_concurrent-wait-handle.cpp index 8a1a6241135f3..20e09c20c49ea 100644 --- a/hphp/runtime/ext/asio/ext_concurrent-wait-handle.cpp +++ b/hphp/runtime/ext/asio/ext_concurrent-wait-handle.cpp @@ -101,7 +101,7 @@ void c_ConcurrentWaitHandle::initialize(ContextStateIndex ctxStateIdx) { } } -void c_ConcurrentWaitHandle::onUnblocked(uint32_t idx) { +void c_ConcurrentWaitHandle::onUnblocked(uint32_t idx, std::vector& worklist) { assertx(idx <= m_unfinished); assertx(getState() == STATE_BLOCKED); @@ -118,25 +118,25 @@ void c_ConcurrentWaitHandle::onUnblocked(uint32_t idx) { } catch (const Object& cycle_exception) { assertx(cycle_exception->instanceof(SystemLib::getThrowableClass())); throwable_recompute_backtrace_from_wh(cycle_exception.get(), this); - markAsFailed(cycle_exception); + markAsFailed(cycle_exception, worklist); } return; } } // All children finished. - markAsFinished(); + markAsFinished(worklist); } } -void c_ConcurrentWaitHandle::markAsFinished() { +void c_ConcurrentWaitHandle::markAsFinished(std::vector& worklist) { auto parentChain = getParentChain(); setState(STATE_SUCCEEDED); tvWriteNull(m_resultOrException); - parentChain.unblock(); + worklist.emplace_back(std::move(parentChain)); decRefObj(this); } -void c_ConcurrentWaitHandle::markAsFailed(const Object& exception) { +void c_ConcurrentWaitHandle::markAsFailed(const Object& exception, std::vector& worklist) { for (uint32_t idx = 0; idx < m_cap; idx++) { auto const child = m_children[idx].m_child; if (!child->isFinished()) { @@ -148,7 +148,7 @@ void c_ConcurrentWaitHandle::markAsFailed(const Object& exception) { auto parentChain = getParentChain(); setState(STATE_FAILED); tvWriteObject(exception.get(), &m_resultOrException); - parentChain.unblock(); + worklist.emplace_back(std::move(parentChain)); decRefObj(this); } diff --git a/hphp/runtime/ext/asio/ext_concurrent-wait-handle.h b/hphp/runtime/ext/asio/ext_concurrent-wait-handle.h index a70c3e0108171..1c8f6d99e023f 100644 --- a/hphp/runtime/ext/asio/ext_concurrent-wait-handle.h +++ b/hphp/runtime/ext/asio/ext_concurrent-wait-handle.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "hphp/runtime/base/type-array.h" #include "hphp/runtime/base/type-object.h" #include "hphp/runtime/ext/asio/ext_waitable-wait-handle.h" @@ -37,9 +39,28 @@ struct c_ConcurrentWaitHandle final : using SystemLib::ClassLoader<"HH\\ConcurrentWaitHandle">::className; static void instanceDtor(ObjectData* obj, const Class*) { auto wh = wait_handle(obj); - auto const sz = wh->heapSize(); - wh->~c_ConcurrentWaitHandle(); - tl_heap->objFree(obj, sz); + + std::vector queue = {wh}; + for (std::size_t i = 0; i < queue.size(); i++) { + auto cur = queue[i]; + for (int32_t j = 0; j < cur->m_cap; j++) { + auto cur_child = cur->m_children[j].m_child; + assertx(isFailed() || cur_child->isFinished()); + + if (cur_child->getKind() == Kind::Concurrent) { + if (cur_child->decReleaseCheck()) { + queue.push_back(cur_child->asConcurrent()); + } + } else { + decRefObj(cur_child); + } + } + } + + for (auto& cur : queue) { + auto const sz = cur->heapSize(); + tl_heap->objFree(cur, sz); + } } explicit c_ConcurrentWaitHandle(unsigned cap = 0) @@ -86,8 +107,8 @@ struct c_ConcurrentWaitHandle final : return getChildIdx() == getWaitHandle()->m_unfinished; } - void onUnblocked() { - getWaitHandle()->onUnblocked(getChildIdx()); + void onUnblocked(std::vector& worklist) { + getWaitHandle()->onUnblocked(getChildIdx(), worklist); } AsioBlockable m_blockable; @@ -100,7 +121,7 @@ struct c_ConcurrentWaitHandle final : } String getName(); - void onUnblocked(uint32_t idx); + void onUnblocked(uint32_t idx, std::vector& worklist); c_WaitableWaitHandle* getChild(); template void forEachChild(T fn); @@ -113,8 +134,8 @@ struct c_ConcurrentWaitHandle final : private: static req::ptr Alloc(int32_t cnt); void initialize(ContextStateIndex ctxStateIdx); - void markAsFinished(void); - void markAsFailed(const Object& exception); + void markAsFinished(std::vector& worklist); + void markAsFailed(const Object& exception, std::vector& worklist); void setState(uint8_t state) { setKindState(Kind::Concurrent, state); } private: diff --git a/hphp/runtime/ext/asio/ext_condition-wait-handle.cpp b/hphp/runtime/ext/asio/ext_condition-wait-handle.cpp index fbc125c48c72e..4ada943d95d52 100644 --- a/hphp/runtime/ext/asio/ext_condition-wait-handle.cpp +++ b/hphp/runtime/ext/asio/ext_condition-wait-handle.cpp @@ -126,7 +126,7 @@ void c_ConditionWaitHandle::initialize(c_WaitableWaitHandle* child) { } } -void c_ConditionWaitHandle::onUnblocked() { +void c_ConditionWaitHandle::onUnblocked(std::vector& worklist) { decRefObj(m_child); m_child = nullptr; @@ -142,7 +142,7 @@ void c_ConditionWaitHandle::onUnblocked() { make_tv(getNotNotifiedException().detach()), m_resultOrException ); - parentChain.unblock(); + worklist.emplace_back(std::move(parentChain)); decRefObj(this); } diff --git a/hphp/runtime/ext/asio/ext_condition-wait-handle.h b/hphp/runtime/ext/asio/ext_condition-wait-handle.h index bb49388396cbd..fd180da215dbf 100644 --- a/hphp/runtime/ext/asio/ext_condition-wait-handle.h +++ b/hphp/runtime/ext/asio/ext_condition-wait-handle.h @@ -18,6 +18,8 @@ #ifndef incl_HPHP_EXT_ASIO_CONDITION_WAIT_HANDLE_H_ #define incl_HPHP_EXT_ASIO_CONDITION_WAIT_HANDLE_H_ +#include + #include "hphp/runtime/base/vanilla-dict.h" #include "hphp/runtime/ext/asio/ext_waitable-wait-handle.h" #include "hphp/runtime/ext/extension.h" @@ -48,7 +50,7 @@ struct c_ConditionWaitHandle final : } String getName(); - void onUnblocked(); + void onUnblocked(std::vector& worklist); c_WaitableWaitHandle* getChild(); static const int8_t STATE_BLOCKED = 2; diff --git a/hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.cpp b/hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.cpp index 29cc0962b44c9..017da518da238 100644 --- a/hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.cpp +++ b/hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.cpp @@ -80,7 +80,7 @@ void c_PriorityBridgeWaitHandle::initialize(c_WaitableWaitHandle* child) { } } -void c_PriorityBridgeWaitHandle::onUnblocked() { +void c_PriorityBridgeWaitHandle::onUnblocked(std::vector& worklist) { auto parentChain = getParentChain(); // Propagate the child's result. @@ -90,7 +90,7 @@ void c_PriorityBridgeWaitHandle::onUnblocked() { decRefObj(m_child); m_child = nullptr; - parentChain.unblock(); + worklist.emplace_back(std::move(parentChain)); decRefObj(this); } diff --git a/hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.h b/hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.h index f7dbe11086ce9..951f7b7c35404 100644 --- a/hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.h +++ b/hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "hphp/runtime/ext/asio/ext_waitable-wait-handle.h" #include "hphp/runtime/ext/asio/ext_resumable-wait-handle.h" @@ -49,7 +51,7 @@ struct c_PriorityBridgeWaitHandle final } String getName(); - void onUnblocked(); + void onUnblocked(std::vector& worklist); c_WaitableWaitHandle* getChild(); // Prioritize the child of this PBWH. Will lift the child into the object's diff --git a/hphp/test/slow/async/long_awaitall_chain.php b/hphp/test/slow/async/long_awaitall_chain.php new file mode 100644 index 0000000000000..35b5b5ee1f5d9 --- /dev/null +++ b/hphp/test/slow/async/long_awaitall_chain.php @@ -0,0 +1,29 @@ + { + $s = new \HH\Lib\Async\Semaphore(10, async (bool $should_wait) ==> { + if ($should_wait) { + await Asio\usleep(20000*1000); + } else { + await Asio\later(); + } + }); + + concurrent { + await $s->waitForAsync(true); + await async { + for ($i = 0; $i < 1000000; $i++) { + await $s->waitForAsync(false); + } + + echo "$i\n"; + }; + } +} + +<<__EntryPoint>> +function main(): void { + Asio\join(bar()); +} diff --git a/hphp/test/slow/async/long_awaitall_chain.php.expect b/hphp/test/slow/async/long_awaitall_chain.php.expect new file mode 100644 index 0000000000000..749fce669df1b --- /dev/null +++ b/hphp/test/slow/async/long_awaitall_chain.php.expect @@ -0,0 +1 @@ +1000000