diff --git a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems index 836920e1c83952..d30c305a7e6fae 100644 --- a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems +++ b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems @@ -2839,6 +2839,7 @@ + diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Backoff.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Backoff.cs new file mode 100644 index 00000000000000..9cc96d95a5d064 --- /dev/null +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Backoff.cs @@ -0,0 +1,35 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.InteropServices; + +namespace System.Threading +{ + internal static class Backoff + { + // We will use exponential backoff in rare cases when we need to change state atomically and cannot + // make progress due to concurrent state changes by other threads. + // While we cannot know the ideal amount of wait needed before making a successful attempt, + // the exponential backoff will generally be not more than 2X worse than the perfect guess and + // will do a lot less attempts than a simple retry. On multiprocessor machine fruitless attempts + // will cause unnecessary sharing of the contended state which may make modifying the state more expensive. + // To protect against degenerate cases we will cap the per-iteration wait to a few thousand spinwaits. + private const uint MaxExponentialBackoffBits = 14; + + internal static unsafe void Exponential(uint attempt) + { + attempt = Math.Min(attempt, MaxExponentialBackoffBits); + // We will backoff for some random number of spins that roughly grows as attempt^2 + // No need for much randomness here, randomness is "good to have", we could do without it, + // so we will just cheaply hash in the stack location. + uint rand = (uint)&attempt * 2654435769u; + // Set the highmost bit to ensure minimum number of spins is exponentially increasing. + // It basically guarantees that we spin at least 0, 1, 2, 4, 8, 16, times, and so on + rand |= (1u << 31); + uint spins = rand >> (byte)(32 - attempt); + Thread.SpinWait((int)spins); + } + } +} diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs index 0f789de54ee214..73464549ff9dea 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs @@ -16,20 +16,16 @@ internal sealed partial class LowLevelLifoSemaphore : IDisposable private CacheLineSeparatedCounts _separated; private readonly int _maximumSignalCount; - private readonly int _spinCount; + private readonly uint _spinCount; private readonly Action _onWait; - private const int SpinSleep0Threshold = 10; - - public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount, int spinCount, Action onWait) + public LowLevelLifoSemaphore(int maximumSignalCount, uint spinCount, Action onWait) { - Debug.Assert(initialSignalCount >= 0); - Debug.Assert(initialSignalCount <= maximumSignalCount); Debug.Assert(maximumSignalCount > 0); + Debug.Assert(maximumSignalCount <= short.MaxValue); Debug.Assert(spinCount >= 0); _separated = default; - _separated._counts.SignalCount = (uint)initialSignalCount; _maximumSignalCount = maximumSignalCount; _spinCount = spinCount; _onWait = onWait; @@ -37,7 +33,7 @@ public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount, int Create(maximumSignalCount); } - public bool Wait(int timeoutMs, bool spinWait) + public bool Wait(int timeoutMs) { Debug.Assert(timeoutMs >= -1); @@ -45,86 +41,59 @@ public bool Wait(int timeoutMs, bool spinWait) Thread.AssureBlockingPossible(); #endif - int spinCount = spinWait ? _spinCount : 0; - - // Try to acquire the semaphore or - // a) register as a spinner if spinCount > 0 and timeoutMs > 0 - // b) register as a waiter if there's already too many spinners or spinCount == 0 and timeoutMs > 0 - // c) bail out if timeoutMs == 0 and return false + // Try one-shot acquire first Counts counts = _separated._counts; - while (true) + if (counts.SignalCount != 0) { - Debug.Assert(counts.SignalCount <= _maximumSignalCount); Counts newCounts = counts; - if (counts.SignalCount != 0) - { - newCounts.DecrementSignalCount(); - } - else if (timeoutMs != 0) - { - if (spinCount > 0 && newCounts.SpinnerCount < byte.MaxValue) - { - newCounts.IncrementSpinnerCount(); - } - else - { - // Maximum number of spinners reached, register as a waiter instead - newCounts.IncrementWaiterCount(); - } - } - + newCounts.DecrementSignalCount(); Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts); if (countsBeforeUpdate == counts) { - if (counts.SignalCount != 0) - { - return true; - } - if (newCounts.WaiterCount != counts.WaiterCount) - { - return WaitForSignal(timeoutMs); - } - if (timeoutMs == 0) - { - return false; - } - break; + // we've consumed a signal + return true; } - - counts = countsBeforeUpdate; } - bool isSingleProcessor = Environment.IsSingleProcessor; - int spinIndex = isSingleProcessor ? SpinSleep0Threshold : 0; - while (spinIndex < spinCount) + return WaitSlow(timeoutMs); + } + + private bool WaitSlow(int timeoutMs) + { + // Now spin briefly with exponential backoff. + // We use random exponential backoff because: + // - we do not know how soon a signal appears, but with exponential backoff we will not be more than 2x off the ideal guess + // - it gives mild preference to the most recent spinners. We want LIFO here so that hot(er) threads keep running. + // - it is possible that spinning workers prevent non-pool threads from submitting more work to the pool, + // so we want some workers to sleep earlier than others. + uint spinCount = Environment.IsSingleProcessor ? 0 : _spinCount; + for (uint iteration = 0; iteration < spinCount; iteration++) { - LowLevelSpinWaiter.Wait(spinIndex, SpinSleep0Threshold, isSingleProcessor); - spinIndex++; + Backoff.Exponential(iteration); - // Try to acquire the semaphore and unregister as a spinner - counts = _separated._counts; - while (counts.SignalCount > 0) + Counts counts = _separated._counts; + if (counts.SignalCount != 0) { Counts newCounts = counts; newCounts.DecrementSignalCount(); - newCounts.DecrementSpinnerCount(); - Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts); if (countsBeforeUpdate == counts) { + // we've consumed a signal return true; } - - counts = countsBeforeUpdate; } } - // Unregister as spinner, and acquire the semaphore or register as a waiter - counts = _separated._counts; + // Now we will try registering as a waiter and wait. + // If signaled before that, we have to acquire as this can be the last thread that could take that signal. + // The difference with spinning above is that we are not waiting for a signal. We should immediately succeed + // unless a lot of threads are trying to update the counts. Thus we use a different attempt counter. + uint collisionCount = 0; while (true) { + Counts counts = _separated._counts; Counts newCounts = counts; - newCounts.DecrementSpinnerCount(); if (counts.SignalCount != 0) { newCounts.DecrementSignalCount(); @@ -140,7 +109,7 @@ public bool Wait(int timeoutMs, bool spinWait) return counts.SignalCount != 0 || WaitForSignal(timeoutMs); } - counts = countsBeforeUpdate; + Backoff.Exponential(collisionCount++); } } @@ -162,130 +131,113 @@ private bool WaitForSignal(int timeoutMs) } int endWaitTicks = timeoutMs != -1 ? Environment.TickCount : 0; - // Unregister the waiter if this thread will not be waiting anymore, and try to acquire the semaphore - Counts counts = _separated._counts; + uint collisionCount = 0; while (true) { - Debug.Assert(counts.WaiterCount != 0); + Counts counts = _separated._counts; Counts newCounts = counts; - if (counts.SignalCount != 0) + + Debug.Assert(counts.WaiterCount != 0); + Debug.Assert(counts.CountOfWaitersSignaledToWake != 0); + + newCounts.DecrementCountOfWaitersSignaledToWake(); + if (newCounts.SignalCount != 0) { newCounts.DecrementSignalCount(); newCounts.DecrementWaiterCount(); } - // This waiter has woken up and this needs to be reflected in the count of waiters signaled to wake - if (counts.CountOfWaitersSignaledToWake != 0) - { - newCounts.DecrementCountOfWaitersSignaledToWake(); - } - Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts); if (countsBeforeUpdate == counts) { if (counts.SignalCount != 0) { + // success return true; } + + // we've consumed a wake, but there was no signal, we will wait again. break; } - counts = countsBeforeUpdate; - if (timeoutMs != -1) { - int waitMs = endWaitTicks - startWaitTicks; - if (waitMs >= 0 && waitMs < timeoutMs) - timeoutMs -= waitMs; - else - timeoutMs = 0; - } + // collision, try again. + Backoff.Exponential(collisionCount++); + } + + // we will wait again, reduce timeout + if (timeoutMs != -1) + { + int waitMs = endWaitTicks - startWaitTicks; + if (waitMs >= 0 && waitMs < timeoutMs) + timeoutMs -= waitMs; + else + timeoutMs = 0; } } } - public void Release(int releaseCount) + public void Signal() { - Debug.Assert(releaseCount > 0); - Debug.Assert(releaseCount <= _maximumSignalCount); + // Increment signal count. This enables one-shot acquire. + Counts counts = _separated._counts.InterlockedIncrementSignalCount(); - int countOfWaitersToWake; - Counts counts = _separated._counts; + // Now check if waiters need to be woken + uint collisionCount = 0; while (true) { - Counts newCounts = counts; - - // Increase the signal count. The addition doesn't overflow because of the limit on the max signal count in constructor. - newCounts.AddSignalCount((uint)releaseCount); - - // Determine how many waiters to wake, taking into account how many spinners and waiters there are and how many waiters - // have previously been signaled to wake but have not yet woken - countOfWaitersToWake = - (int)Math.Min(newCounts.SignalCount, (uint)counts.WaiterCount + counts.SpinnerCount) - - counts.SpinnerCount - - counts.CountOfWaitersSignaledToWake; - if (countOfWaitersToWake > 0) + // Determine how many waiters to wake. + // The number of wakes should not be more than the signal count, not more than waiter count and discount any pending wakes. + int countOfWaitersToWake = (int)Math.Min(counts.SignalCount, counts.WaiterCount) - counts.CountOfWaitersSignaledToWake; + if (countOfWaitersToWake <= 0) { - // Ideally, limiting to a maximum of releaseCount would not be necessary and could be an assert instead, but since - // WaitForSignal() does not have enough information to tell whether a woken thread was signaled, and due to the cap - // below, it's possible for countOfWaitersSignaledToWake to be less than the number of threads that have actually - // been signaled to wake. - if (countOfWaitersToWake > releaseCount) - { - countOfWaitersToWake = releaseCount; - } - - // Cap countOfWaitersSignaledToWake to its max value. It's ok to ignore some woken threads in this count, it just - // means some more threads will be woken next time. Typically, it won't reach the max anyway. - newCounts.AddUpToMaxCountOfWaitersSignaledToWake((uint)countOfWaitersToWake); + // No waiters to wake. This is the most common case. + return; } + Counts newCounts = counts; + newCounts.AddCountOfWaitersSignaledToWake((uint)countOfWaitersToWake); Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts); if (countsBeforeUpdate == counts) { - Debug.Assert(releaseCount <= _maximumSignalCount - counts.SignalCount); + Debug.Assert(_maximumSignalCount - counts.SignalCount >= 1); if (countOfWaitersToWake > 0) ReleaseCore(countOfWaitersToWake); return; } - counts = countsBeforeUpdate; + // collision, try again. + Backoff.Exponential(collisionCount++); + + counts = _separated._counts; } } private struct Counts : IEquatable { private const byte SignalCountShift = 0; - private const byte WaiterCountShift = 32; - private const byte SpinnerCountShift = 48; - private const byte CountOfWaitersSignaledToWakeShift = 56; + private const byte WaiterCountShift = 16; + private const byte CountOfWaitersSignaledToWakeShift = 32; private ulong _data; private Counts(ulong data) => _data = data; - private uint GetUInt32Value(byte shift) => (uint)(_data >> shift); - private void SetUInt32Value(uint value, byte shift) => - _data = (_data & ~((ulong)uint.MaxValue << shift)) | ((ulong)value << shift); private ushort GetUInt16Value(byte shift) => (ushort)(_data >> shift); private void SetUInt16Value(ushort value, byte shift) => _data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)value << shift); - private byte GetByteValue(byte shift) => (byte)(_data >> shift); - private void SetByteValue(byte value, byte shift) => - _data = (_data & ~((ulong)byte.MaxValue << shift)) | ((ulong)value << shift); - public uint SignalCount + public ushort SignalCount { - get => GetUInt32Value(SignalCountShift); - set => SetUInt32Value(value, SignalCountShift); + get => GetUInt16Value(SignalCountShift); } - public void AddSignalCount(uint value) + public Counts InterlockedIncrementSignalCount() { - Debug.Assert(value <= uint.MaxValue - SignalCount); - _data += (ulong)value << SignalCountShift; + var countsAfterUpdate = new Counts(Interlocked.Add(ref _data, 1ul << SignalCountShift)); + Debug.Assert(countsAfterUpdate.SignalCount != ushort.MaxValue); // overflow check + return countsAfterUpdate; } - public void IncrementSignalCount() => AddSignalCount(1); - public void DecrementSignalCount() { Debug.Assert(SignalCount != 0); @@ -295,19 +247,18 @@ public void DecrementSignalCount() public ushort WaiterCount { get => GetUInt16Value(WaiterCountShift); - set => SetUInt16Value(value, WaiterCountShift); } - public void IncrementWaiterCount() + public void DecrementWaiterCount() { - Debug.Assert(WaiterCount < ushort.MaxValue); - _data += (ulong)1 << WaiterCountShift; + Debug.Assert(WaiterCount != 0); + _data -= (ulong)1 << WaiterCountShift; } - public void DecrementWaiterCount() + public void IncrementWaiterCount() { + _data += (ulong)1 << WaiterCountShift; Debug.Assert(WaiterCount != 0); - _data -= (ulong)1 << WaiterCountShift; } public void InterlockedDecrementWaiterCount() @@ -316,38 +267,16 @@ public void InterlockedDecrementWaiterCount() Debug.Assert(countsAfterUpdate.WaiterCount != ushort.MaxValue); // underflow check } - public byte SpinnerCount + public ushort CountOfWaitersSignaledToWake { - get => GetByteValue(SpinnerCountShift); - set => SetByteValue(value, SpinnerCountShift); + get => GetUInt16Value(CountOfWaitersSignaledToWakeShift); } - public void IncrementSpinnerCount() + public void AddCountOfWaitersSignaledToWake(uint value) { - Debug.Assert(SpinnerCount < byte.MaxValue); - _data += (ulong)1 << SpinnerCountShift; - } - - public void DecrementSpinnerCount() - { - Debug.Assert(SpinnerCount != 0); - _data -= (ulong)1 << SpinnerCountShift; - } - - public byte CountOfWaitersSignaledToWake - { - get => GetByteValue(CountOfWaitersSignaledToWakeShift); - set => SetByteValue(value, CountOfWaitersSignaledToWakeShift); - } - - public void AddUpToMaxCountOfWaitersSignaledToWake(uint value) - { - uint availableCount = (uint)(byte.MaxValue - CountOfWaitersSignaledToWake); - if (value > availableCount) - { - value = availableCount; - } _data += (ulong)value << CountOfWaitersSignaledToWakeShift; + var countsAfterUpdate = new Counts(_data); + Debug.Assert(countsAfterUpdate.CountOfWaitersSignaledToWake != ushort.MaxValue); // overflow check } public void DecrementCountOfWaitersSignaledToWake() diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs index 73fe2afe9a4d0e..bd6773321374b9 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs @@ -230,7 +230,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo HillClimbing.ThreadPoolHillClimber.ForceChange( newNumThreadsGoal, HillClimbing.StateOrTransition.CooperativeBlocking); - if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0) + + if (counts.NumProcessingWork >= numThreadsGoal && _separated._hasOutstandingThreadRequest != 0) { addWorker = true; } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs index a0434cdfa9abb3..e545b660908538 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs @@ -132,7 +132,7 @@ private static void GateThreadStart() if (!disableStarvationDetection && threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None && - threadPoolInstance._separated.numRequestedWorkers > 0 && + threadPoolInstance._separated._hasOutstandingThreadRequest != 0 && SufficientDelaySinceLastDequeue(threadPoolInstance)) { bool addWorker = false; @@ -187,7 +187,7 @@ private static void GateThreadStart() } } - if (threadPoolInstance._separated.numRequestedWorkers <= 0 && + if (threadPoolInstance._separated._hasOutstandingThreadRequest == 0 && threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None && Interlocked.Decrement(ref threadPoolInstance._separated.gateThreadRunningState) <= GetRunningStateForNumRuns(0)) { @@ -208,7 +208,7 @@ public static void Wake(PortableThreadPool threadPoolInstance) // in deciding "too long" private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoolInstance) { - uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDequeueTime); + uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDispatchTime); uint minimumDelay; if (threadPoolInstance._cpuUtilization < CpuUtilizationLow) { diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs index 26b6ab0cf0ac48..3d3ed4123ae25e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs @@ -44,6 +44,65 @@ public short NumProcessingWork } } + // Returns "true" if adding NumProcessingWork has reached the limit. + // NOTE: it is possible to have overflow and NumProcessingWork under the limit + // at the same time if the limit has been changed afterwards. That is ok. + // While changes in NumProcessingWork need to be matched with semaphore Wait/Signal, + // the redundantly set overflow is mostly harmless and should self-correct when + // a worker that sees no work calls TryDecrementProcessingWork, possibly at a cost of + // redundant check for work. + public bool IsOverflow + { + get + { + return (long)_data < 0; + } + } + + /// + /// Tries to increase the number of threads processing work items by one. + /// If at or above goal, returns false and sets overflow flag instead. + /// NOTE: only if "true" is returned the NumProcessingWork is incremented. + /// + public bool TryIncrementProcessingWork() + { + Debug.Assert(NumProcessingWork >= 0); + if (NumProcessingWork < NumThreadsGoal) + { + NumProcessingWork++; + // This should never overflow + Debug.Assert(NumProcessingWork > 0); + return true; + } + else + { + _data |= (1ul << 63); + return false; + } + } + + /// + /// Tries to reduce the number of threads processing work items by one. + /// If in an overflow state, clears the overflow flag and returns false. + /// NOTE: only if "true" is returned the NumProcessingWork is decremented. + /// + public bool TryDecrementProcessingWork() + { + Debug.Assert(NumProcessingWork > 0); + if (IsOverflow) + { + _data &= ~(1ul << 63); + return false; + } + else + { + NumProcessingWork--; + // This should never underflow + Debug.Assert(NumProcessingWork >= 0); + return true; + } + } + /// /// Number of thread pool threads that currently exist. /// diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs index a34e0f8ff98c4e..4380722d78f32d 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Diagnostics; using System.Diagnostics.Tracing; using System.Runtime.CompilerServices; @@ -15,7 +16,11 @@ private static partial class WorkerThread { private static readonly short ThreadsToKeepAlive = DetermineThreadsToKeepAlive(); - private const int SemaphoreSpinCountDefault = 70; + // Spinning in the threadpool semaphore is not always useful. + // For example the new workitems may be produced by non-pool threads and could only arrive if pool threads start blocking. + // We will limit spinning to roughly 512-1024 spinwaits, each taking 35-50ns. That should be under 50 usec total. + // For reference the wakeup latency of a futex/event with threads queued up is reported to be in 5-50 usec range. (year 2025) + private const int SemaphoreSpinCountDefault = 9; // This value represents an assumption of how much uncommitted stack space a worker thread may use in the future. // Used in calculations to estimate when to throttle the rate of thread injection to reduce the possibility of @@ -42,9 +47,8 @@ private static short DetermineThreadsToKeepAlive() /// private static readonly LowLevelLifoSemaphore s_semaphore = new LowLevelLifoSemaphore( - 0, MaxPossibleThreadCount, - AppContextConfigHelper.GetInt32ComPlusOrDotNetConfig( + (uint)AppContextConfigHelper.GetInt32ComPlusOrDotNetConfig( "System.Threading.ThreadPool.UnfairSemaphoreSpinLimit", "ThreadPool_UnfairSemaphoreSpinLimit", SemaphoreSpinCountDefault, @@ -112,12 +116,13 @@ private static void WorkerThreadStart() while (true) { - bool spinWait = true; - while (semaphore.Wait(timeoutMs, spinWait)) + while (semaphore.Wait(timeoutMs)) { - WorkerDoWork(threadPoolInstance, ref spinWait); + WorkerDoWork(threadPoolInstance); } + // We've timed out waiting on the semaphore. Time to exit. + // In rare cases we may be asked to keep running/waiting. if (ShouldExitWorker(threadPoolInstance, threadAdjustmentLock)) { break; @@ -125,57 +130,33 @@ private static void WorkerThreadStart() } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait) + private static void WorkerDoWork(PortableThreadPool threadPoolInstance) { - bool alreadyRemovedWorkingWorker = false; - while (TakeActiveRequest(threadPoolInstance)) + do { - threadPoolInstance._separated.lastDequeueTime = Environment.TickCount; - if (!ThreadPoolWorkQueue.Dispatch()) + // We generally avoid spurious wakes as they are wasteful, so we nearly always should see a request. + // However, we allow external wakes when thread goals change, which can result in "stolen" requests, + // thus sometimes there is no active request and we need to check. + if (threadPoolInstance._separated._hasOutstandingThreadRequest != 0 && + Interlocked.Exchange(ref threadPoolInstance._separated._hasOutstandingThreadRequest, 0) != 0) { - // ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have - // already removed this working worker in the counts. This typically happens when hill climbing - // decreases the worker thread count goal. - alreadyRemovedWorkingWorker = true; - break; - } - - if (threadPoolInstance._separated.numRequestedWorkers <= 0) - { - break; - } - - // In highly bursty cases with short bursts of work, especially in the portable thread pool - // implementation, worker threads are being released and entering Dispatch very quickly, not finding - // much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on - // data and some interlocked operations, and similarly when the thread pool runs out of work. Since - // there is a pending request for work, introduce a slight delay before serving the next request. - // The spin-wait is mainly for when the sleep is not effective due to there being no other threads - // to schedule. - Thread.UninterruptibleSleep0(); - if (!Environment.IsSingleProcessor) - { - Thread.SpinWait(1); + // We took the request, now we must Dispatch some work items. + threadPoolInstance.NotifyDispatchProgress(Environment.TickCount); + if (!ThreadPoolWorkQueue.Dispatch()) + { + // We are above goal and would have already removed this working worker in the counts. + return; + } } - } - // Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work, - // as it's unlikely that the worker thread count goal would be increased again so soon afterwards that - // the semaphore would be released within the spin-wait window - spinWait = !alreadyRemovedWorkingWorker; - - if (!alreadyRemovedWorkingWorker) - { - // If we woke up but couldn't find a request, or ran out of work items to process, we need to update - // the number of working workers to reflect that we are done working for now - RemoveWorkingWorker(threadPoolInstance); - } + // We could not find more work in the queue and will try to stop being active. + // One caveat - in overflow state we may have cleared a work request without asking for a worker. + // Thus if there is uncleared overflow, one thread will be back for another round - without consuming a wake. + } while (!TryRemoveWorkingWorker(threadPoolInstance)); } // returns true if the worker is shutting down // returns false if we should do another iteration - [MethodImpl(MethodImplOptions.AggressiveInlining)] private static bool ShouldExitWorker(PortableThreadPool threadPoolInstance, LowLevelLock threadAdjustmentLock) { // The thread cannot exit if it has IO pending, otherwise the IO may be canceled @@ -235,80 +216,65 @@ private static bool ShouldExitWorker(PortableThreadPool threadPoolInstance, LowL } /// - /// Reduce the number of working workers by one, but maybe add back a worker (possibily this thread) if a thread request comes in while we are marking this thread as not working. + /// Tries to reduce the number of working workers by one. + /// If we are in a state of overflow, clears the overflow instead and returns false. + /// Returns true if number of active threads was actually reduced. /// - private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance) + private static bool TryRemoveWorkingWorker(PortableThreadPool threadPoolInstance) { - // A compare-exchange loop is used instead of Interlocked.Decrement or Interlocked.Add to defensively prevent - // NumProcessingWork from underflowing. See the setter for NumProcessingWork. - ThreadCounts counts = threadPoolInstance._separated.counts; + uint collisionCount = 0; while (true) { - ThreadCounts newCounts = counts; - newCounts.NumProcessingWork--; - - ThreadCounts countsBeforeUpdate = - threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); - if (countsBeforeUpdate == counts) + ThreadCounts oldCounts = threadPoolInstance._separated.counts; + ThreadCounts newCounts = oldCounts; + bool decremented = newCounts.TryDecrementProcessingWork(); + if (threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, oldCounts) == oldCounts) { - break; + return decremented; } - counts = countsBeforeUpdate; - } - - // It's possible that we decided we had thread requests just before a request came in, - // but reduced the worker count *after* the request came in. In this case, we might - // miss the notification of a thread request. So we wake up a thread (maybe this one!) - // if there is work to do. - if (threadPoolInstance._separated.numRequestedWorkers > 0) - { - MaybeAddWorkingWorker(threadPoolInstance); + // This can be fairly contentious when threadpool runs out of work and all threads try to leave. + Backoff.Exponential(collisionCount++); } } + /// In a state of overflow does nothing. + /// Otherwise increments the active worker count and signals the semaphore. + /// Incrementing the count turns on the overflow state if the active thread limit is reached. internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance) { - ThreadCounts counts = threadPoolInstance._separated.counts; - short numExistingThreads, numProcessingWork, newNumExistingThreads, newNumProcessingWork; + ThreadCounts oldCounts, newCounts; + bool incremented; + uint collisionCount = 0; while (true) { - numProcessingWork = counts.NumProcessingWork; - if (numProcessingWork >= counts.NumThreadsGoal) - { - return; - } - - newNumProcessingWork = (short)(numProcessingWork + 1); - numExistingThreads = counts.NumExistingThreads; - newNumExistingThreads = Math.Max(numExistingThreads, newNumProcessingWork); - - ThreadCounts newCounts = counts; - newCounts.NumProcessingWork = newNumProcessingWork; - newCounts.NumExistingThreads = newNumExistingThreads; - - ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); - - if (oldCounts == counts) + oldCounts = threadPoolInstance._separated.counts; + newCounts = oldCounts; + incremented = newCounts.TryIncrementProcessingWork(); + newCounts.NumExistingThreads = Math.Max(newCounts.NumProcessingWork, newCounts.NumExistingThreads); + if (threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, oldCounts) == oldCounts) { break; } - counts = oldCounts; + // This is less contentious than Remove as reasons to add threads are more complex to avoid adding too many too fast. + // We can still see some amount of failed interlocked operations here when a burst of work is scheduled. + Backoff.Exponential(collisionCount++); } - int toCreate = newNumExistingThreads - numExistingThreads; - int toRelease = newNumProcessingWork - numProcessingWork; - - if (toRelease > 0) + if (!incremented) { - s_semaphore.Release(toRelease); + return; } - while (toCreate > 0) + Debug.Assert(newCounts.NumProcessingWork - oldCounts.NumProcessingWork == 1); + s_semaphore.Signal(); + + int toCreate = newCounts.NumExistingThreads - oldCounts.NumExistingThreads; + Debug.Assert(toCreate == 0 || toCreate == 1); + if (toCreate != 0) { CreateWorkerThread(); - toCreate--; } } @@ -326,10 +292,7 @@ internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolIn // When there are more threads processing work than the thread count goal, it may have been decided // to decrease the number of threads. Stop processing if the counts can be updated. We may have more // threads existing than the thread count goal and that is ok, the cold ones will eventually time out if - // the thread count goal is not increased again. This logic is a bit different from the original CoreCLR - // code from which this implementation was ported, which turns a processing thread into a retired thread - // and checks for pending requests like RemoveWorkingWorker. In this implementation there are - // no retired threads, so only the count of threads processing work is considered. + // the thread count goal is not increased again. if (counts.NumProcessingWork <= counts.NumThreadsGoal) { return false; @@ -347,21 +310,6 @@ internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolIn counts = oldCounts; } } - - private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance) - { - int count = threadPoolInstance._separated.numRequestedWorkers; - while (count > 0) - { - int prevCount = Interlocked.CompareExchange(ref threadPoolInstance._separated.numRequestedWorkers, count - 1, count); - if (prevCount == count) - { - return true; - } - count = prevCount; - } - return false; - } } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs index a2184f544cf1ad..d53f03ce2a81f7 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs @@ -87,8 +87,10 @@ private struct CacheLineSeparated [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)] public ThreadCounts counts; // SOS's ThreadPool command depends on this name + // Periodically updated heartbeat timestamp to indicate that we are making progress. + // Used in starvation detection. [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)] - public int lastDequeueTime; + public int lastDispatchTime; [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3)] public int priorCompletionCount; @@ -97,8 +99,20 @@ private struct CacheLineSeparated [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3 + sizeof(int) * 2)] public int nextCompletedWorkRequestsTime; + // This flag is used for communication between item enqueuing and workers that process the items. + // There are two states of this flag: + // 0: has no guarantees + // 1: means a worker will check work queues and ensure that + // any work items inserted in work queue before setting the flag + // are picked up. + // Note: The state must be cleared by the worker thread _before_ + // checking. Otherwise there is a window between finding no work + // and resetting the flag, when the flag is in a wrong state. + // A new work item may be added right before the flag is reset + // without asking for a worker, while the last worker is quitting. [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4)] - public volatile int numRequestedWorkers; + public int _hasOutstandingThreadRequest; + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4 + sizeof(int))] public int gateThreadRunningState; } @@ -209,7 +223,7 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads) else if (_separated.counts.NumThreadsGoal < newMinThreads) { _separated.counts.InterlockedSetNumThreadsGoal(newMinThreads); - if (_separated.numRequestedWorkers > 0) + if (_separated._hasOutstandingThreadRequest != 0) { addWorker = true; } @@ -330,26 +344,30 @@ private ThreadInt64PersistentCounter.ThreadLocalNode CreateThreadLocalCompletion return threadLocalCompletionCountNode; } - private void NotifyWorkItemProgress(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int currentTimeMs) + private static void NotifyWorkItemProgress(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode) { threadLocalCompletionCountNode.Increment(); - _separated.lastDequeueTime = currentTimeMs; + } + internal void NotifyWorkItemProgress() + { + NotifyWorkItemProgress(GetOrCreateThreadLocalCompletionCountNode()); + } + + internal bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int currentTimeMs) + { + NotifyWorkItemProgress(threadLocalCompletionCountNode); if (ShouldAdjustMaxWorkersActive(currentTimeMs)) { AdjustMaxWorkersActive(); } - } - internal void NotifyWorkItemProgress() => - NotifyWorkItemProgress(GetOrCreateThreadLocalCompletionCountNode(), Environment.TickCount); + return !WorkerThread.ShouldStopProcessingWorkNow(this); + } - internal bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode? threadLocalCompletionCountNode, int currentTimeMs) + internal void NotifyDispatchProgress(int currentTickCount) { - Debug.Assert(threadLocalCompletionCountNode != null); - - NotifyWorkItemProgress(threadLocalCompletionCountNode, currentTimeMs); - return !WorkerThread.ShouldStopProcessingWorkNow(this); + _separated.lastDispatchTime = currentTickCount; } // @@ -459,13 +477,15 @@ private bool ShouldAdjustMaxWorkersActive(int currentTimeMs) return _pendingBlockingAdjustment == PendingBlockingAdjustment.None; } - internal void RequestWorker() + internal void EnsureWorkerRequested() { - // The order of operations here is important. MaybeAddWorkingWorker() and EnsureRunning() use speculative checks to - // do their work and the memory barrier from the interlocked operation is necessary in this case for correctness. - Interlocked.Increment(ref _separated.numRequestedWorkers); - WorkerThread.MaybeAddWorkingWorker(this); - GateThread.EnsureRunning(this); + // Only one worker is requested at a time to mitigate Thundering Herd problem. + if (_separated._hasOutstandingThreadRequest == 0 && + Interlocked.Exchange(ref _separated._hasOutstandingThreadRequest, 1) == 0) + { + WorkerThread.MaybeAddWorkingWorker(this); + GateThread.EnsureRunning(this); + } } private bool OnGen2GCCallback() diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.cs index 7933e49db422b9..d2515952b4d8e1 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.cs @@ -8,6 +8,7 @@ public static partial class ThreadPool // Indicates that the threadpool should yield the thread from the dispatch loop to the // runtime periodically. We use this to return back to the JS event loop so that the JS // event queue can be drained - internal static bool YieldFromDispatchLoop => true; +#pragma warning disable IDE0060 // Remove unused parameter + internal static bool YieldFromDispatchLoop(int currentTickCount) => true; +#pragma warning restore IDE0060 } } -} diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.cs index 5250e1df5f1ab9..3d1428d0c53b62 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.cs @@ -35,7 +35,9 @@ public static partial class ThreadPool { // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work - internal static bool YieldFromDispatchLoop => true; +#pragma warning disable IDE0060 // Remove unused parameter + internal static bool YieldFromDispatchLoop(int currentTickCount) => true; +#pragma warning restore IDE0060 private const bool IsWorkerTrackingEnabledInConfig = false; @@ -78,7 +80,7 @@ public static void GetAvailableThreads(out int workerThreads, out int completion public static long CompletedWorkItemCount => 0; [DynamicDependency("BackgroundJobHandler")] // https://github.com/dotnet/runtime/issues/101434 - internal static unsafe void RequestWorkerThread() + internal static unsafe void EnsureWorkerRequested() { if (_callbackQueued) return; diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Unix.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Unix.cs index b100409793ba20..236aa07bc32aae 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Unix.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Unix.cs @@ -19,7 +19,11 @@ public static partial class ThreadPool #if !(TARGET_BROWSER && FEATURE_WASM_MANAGED_THREADS) // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work. - internal static bool YieldFromDispatchLoop => false; + internal static bool YieldFromDispatchLoop(int currentTickCount) + { + PortableThreadPool.ThreadPoolInstance.NotifyDispatchProgress(currentTickCount); + return false; + } #endif internal static ThreadInt64PersistentCounter.ThreadLocalNode GetOrCreateThreadLocalCompletionCountNode() => @@ -67,9 +71,9 @@ internal static bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadL /// /// This method is called to request a new thread pool worker to handle pending work. /// - internal static unsafe void RequestWorkerThread() + internal static unsafe void EnsureWorkerRequested() { - PortableThreadPool.ThreadPoolInstance.RequestWorker(); + PortableThreadPool.ThreadPoolInstance.EnsureWorkerRequested(); } internal static void ReportThreadStatus(bool isWorking) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.cs index fa499f0fd857fb..5f983e1d3c4389 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.cs @@ -35,7 +35,9 @@ public static partial class ThreadPool { // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work - internal static bool YieldFromDispatchLoop => true; +#pragma warning disable IDE0060 // Remove unused parameter + internal static bool YieldFromDispatchLoop(int currentTickCount) => true; +#pragma warning restore IDE0060 private const bool IsWorkerTrackingEnabledInConfig = false; @@ -75,7 +77,7 @@ public static void GetAvailableThreads(out int workerThreads, out int completion public static long CompletedWorkItemCount => 0; - internal static unsafe void RequestWorkerThread() + internal static unsafe void EnsureWorkerRequested() { } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Windows.cs index 0ad70d35a92c32..223cea9c318e4e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Windows.cs @@ -30,11 +30,19 @@ public static partial class ThreadPool // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work. - // - // Windows thread pool threads need to yield back to the thread pool periodically, otherwise those threads may be - // considered to be doing long-running work and change thread pool heuristics, such as slowing or halting thread - // injection. - internal static bool YieldFromDispatchLoop => UseWindowsThreadPool; + internal static bool YieldFromDispatchLoop(int currentTickCount) + { + if (UseWindowsThreadPool) + { + // Windows thread pool threads need to yield back to the thread pool periodically, otherwise those threads may be + // considered to be doing long-running work and change thread pool heuristics, such as slowing or halting thread + // injection. + return true; + } + + PortableThreadPool.ThreadPoolInstance.NotifyDispatchProgress(currentTickCount); + return false; + } [CLSCompliant(false)] [SupportedOSPlatform("windows")] @@ -155,17 +163,24 @@ internal static void NotifyThreadUnblocked() } /// - /// This method is called to request a new thread pool worker to handle pending work. + /// This method is called to notify the thread pool about pending work. + /// It will start with an ordinary read to check if a request is already pending as we + /// optimize for a case when queues already have items and this flag is already set. + /// Make sure that the presence of the item that is being added to the queue is visible + /// before calling this. + /// Typically this is not a problem when enqueing uses an interlocked update of the queue + /// index to establish the presence of the new item. More care may be needed when an item + /// is inserted via ordinary or volatile writes. /// - internal static void RequestWorkerThread() + internal static void EnsureWorkerRequested() { if (ThreadPool.UseWindowsThreadPool) { - WindowsThreadPool.RequestWorkerThread(); + WindowsThreadPool.EnsureWorkerRequested(); } else { - PortableThreadPool.ThreadPoolInstance.RequestWorker(); + PortableThreadPool.ThreadPoolInstance.EnsureWorkerRequested(); } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index b3bdc1b80f7077..0480f93dfa35b1 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -127,7 +127,11 @@ public void LocalPush(object obj) // When there are at least 2 elements' worth of space, we can take the fast path. if (tail < m_headIndex + m_mask) { - Volatile.Write(ref m_array[tail & m_mask], obj); + m_array[tail & m_mask] = obj; + // The following write makes the slot to "appear" in the queue. + // It must happen after the write of the item, and it does, since m_tailIndex is volatile. + // NOTE: we also must be sure this write is not delayed past our check for a + // pending thread request. m_tailIndex = tail + 1; } else @@ -156,7 +160,11 @@ public void LocalPush(object obj) m_mask = (m_mask << 1) | 1; } - Volatile.Write(ref m_array[tail & m_mask], obj); + m_array[tail & m_mask] = obj; + // The following write makes the slot to "appear" in the queue. + // It must happen after the write of the item, and it does, since m_tailIndex is volatile. + // NOTE: we also must be sure this write is not delayed past our check for a + // pending thread request. m_tailIndex = tail + 1; } finally @@ -165,6 +173,10 @@ public void LocalPush(object obj) m_foreignLock.Exit(useMemoryBarrier: false); } } + + // Our caller will check for a thread request now (with an ordinary read), + // make sure the check happens after the new slot appears in the queue. + Interlocked.MemoryBarrier(); } [MethodImpl(MethodImplOptions.NoInlining)] @@ -410,7 +422,6 @@ public int Count private bool _loggingEnabled; private bool _dispatchNormalPriorityWorkFirst; - private bool _mayHaveHighPriorityWorkItems; // SOS's ThreadPool command depends on the following names internal readonly WorkQueue workItems = new WorkQueue(); @@ -431,29 +442,6 @@ public int Count private readonly int[] _assignedWorkItemQueueThreadCounts = s_assignableWorkItemQueueCount > 0 ? new int[s_assignableWorkItemQueueCount] : Array.Empty(); - [StructLayout(LayoutKind.Sequential)] - private struct CacheLineSeparated - { - private readonly Internal.PaddingFor32 pad1; - - // This flag is used for communication between item enqueuing and workers that process the items. - // There are two states of this flag: - // 0: has no guarantees - // 1: means a worker will check work queues and ensure that - // any work items inserted in work queue before setting the flag - // are picked up. - // Note: The state must be cleared by the worker thread _before_ - // checking. Otherwise there is a window between finding no work - // and resetting the flag, when the flag is in a wrong state. - // A new work item may be added right before the flag is reset - // without asking for a worker, while the last worker is quitting. - public int _hasOutstandingThreadRequest; - - private readonly Internal.PaddingFor32 pad2; - } - - private CacheLineSeparated _separated; - public ThreadPoolWorkQueue() { for (int i = 0; i < s_assignableWorkItemQueueCount; i++) @@ -464,48 +452,6 @@ public ThreadPoolWorkQueue() RefreshLoggingEnabled(); } - private void AssignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl) - { - Debug.Assert(s_assignableWorkItemQueueCount > 0); - - _queueAssignmentLock.Acquire(); - - // Determine the first queue that has not yet been assigned to the limit of worker threads - int queueIndex = -1; - int minCount = int.MaxValue; - int minCountQueueIndex = 0; - for (int i = 0; i < s_assignableWorkItemQueueCount; i++) - { - int count = _assignedWorkItemQueueThreadCounts[i]; - Debug.Assert(count >= 0); - if (count < ProcessorsPerAssignableWorkItemQueue) - { - queueIndex = i; - _assignedWorkItemQueueThreadCounts[queueIndex] = count + 1; - break; - } - - if (count < minCount) - { - minCount = count; - minCountQueueIndex = i; - } - } - - if (queueIndex < 0) - { - // All queues have been fully assigned. Choose the queue that has been assigned to the least number of worker - // threads. - queueIndex = minCountQueueIndex; - _assignedWorkItemQueueThreadCounts[queueIndex]++; - } - - _queueAssignmentLock.Release(); - - tl.queueIndex = queueIndex; - tl.assignedGlobalWorkItemQueue = _assignableWorkItemQueues[queueIndex]; - } - private void TryReassignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl) { Debug.Assert(s_assignableWorkItemQueueCount > 0); @@ -521,6 +467,13 @@ private void TryReassignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl) return; } + // if not assigned yet, assume temporarily that the last queue is assigned + if (queueIndex == -1) + { + queueIndex = _assignedWorkItemQueueThreadCounts.Length - 1; + _assignedWorkItemQueueThreadCounts[queueIndex]++; + } + // If the currently assigned queue is assigned to other worker threads, try to reassign an earlier queue to this // worker thread if the earlier queue is not assigned to the limit of worker threads Debug.Assert(_assignedWorkItemQueueThreadCounts[queueIndex] >= 0); @@ -549,6 +502,11 @@ private void UnassignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl) Debug.Assert(s_assignableWorkItemQueueCount > 0); int queueIndex = tl.queueIndex; + if (queueIndex == -1) + { + // a queue was never assigned + return; + } _queueAssignmentLock.Acquire(); int newCount = --_assignedWorkItemQueueThreadCounts[queueIndex]; @@ -572,8 +530,12 @@ private void UnassignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl) if (movedWorkItem) { - EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } + + // unassigned state + tl.queueIndex = -1; + tl.assignedGlobalWorkItemQueue = workItems; } public ThreadPoolWorkQueueThreadLocals GetOrCreateThreadLocals() => @@ -608,16 +570,6 @@ public void RefreshLoggingEnabledFull() _loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void EnsureThreadRequested() - { - // Only one worker is requested at a time to mitigate Thundering Herd problem. - if (Interlocked.Exchange(ref _separated._hasOutstandingThreadRequest, 1) == 0) - { - ThreadPool.RequestWorkerThread(); - } - } - public void Enqueue(object callback, bool forceGlobal) { Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task)); @@ -648,7 +600,7 @@ public void Enqueue(object callback, bool forceGlobal) } } - EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } #if CORECLR @@ -696,10 +648,7 @@ public void EnqueueAtHighPriority(object workItem) highPriorityWorkItems.Enqueue(workItem); - // If the change below is seen by another thread, ensure that the enqueued work item will also be visible - Volatile.Write(ref _mayHaveHighPriorityWorkItems, true); - - EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } internal static void TransferAllLocalWorkItemsToHighPriorityGlobalQueue() @@ -713,39 +662,32 @@ internal static void TransferAllLocalWorkItemsToHighPriorityGlobalQueue() // Pop each work item off the local queue and push it onto the global. This is a // bounded loop as no other thread is allowed to push into this thread's queue. ThreadPoolWorkQueue queue = ThreadPool.s_workQueue; - bool addedHighPriorityWorkItem = false; - bool ensureThreadRequest = false; + bool ensureWorkerRequest = false; while (tl.workStealingQueue.LocalPop() is object workItem) { + // A work item had been removed temporarily and other threads may have missed stealing it, so ensure that + // there will be a thread request + ensureWorkerRequest = true; + // If there's an unexpected exception here that happens to get handled, the lost work item, or missing thread // request, etc., may lead to other issues. A fail-fast or try-finally here could reduce the effect of such // uncommon issues to various degrees, but it's also uncommon to check for unexpected exceptions. try { queue.highPriorityWorkItems.Enqueue(workItem); - addedHighPriorityWorkItem = true; } catch (OutOfMemoryException) { // This is not expected to throw under normal circumstances tl.workStealingQueue.LocalPush(workItem); - // A work item had been removed temporarily and other threads may have missed stealing it, so ensure that - // there will be a thread request - ensureThreadRequest = true; break; } } - if (addedHighPriorityWorkItem) - { - Volatile.Write(ref queue._mayHaveHighPriorityWorkItems, true); - ensureThreadRequest = true; - } - - if (ensureThreadRequest) + if (ensureWorkerRequest) { - queue.EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } } @@ -774,9 +716,11 @@ internal static bool LocalFindAndPop(object callback) tl.isProcessingHighPriorityWorkItems = false; } - else if ( - _mayHaveHighPriorityWorkItems && - Interlocked.CompareExchange(ref _mayHaveHighPriorityWorkItems, false, true) && +#if FEATURE_SINGLE_THREADED + else if (highPriorityWorkItems.Count == 0 && +#else + else if (!highPriorityWorkItems.IsEmpty && +#endif TryStartProcessingHighPriorityWorkItemsAndDequeue(tl, out workItem)) { return workItem; @@ -855,7 +799,6 @@ private bool TryStartProcessingHighPriorityWorkItemsAndDequeue( } tl.isProcessingHighPriorityWorkItems = true; - _mayHaveHighPriorityWorkItems = true; return true; } @@ -934,33 +877,15 @@ internal static bool Dispatch() { ThreadPoolWorkQueue workQueue = ThreadPool.s_workQueue; ThreadPoolWorkQueueThreadLocals tl = workQueue.GetOrCreateThreadLocals(); - - if (s_assignableWorkItemQueueCount > 0) - { - workQueue.AssignWorkItemQueue(tl); - } - - // Before dequeuing the first work item, acknowledge that the thread request has been satisfied - workQueue._separated._hasOutstandingThreadRequest = 0; - - // The state change must happen before sweeping queues for items. - Interlocked.MemoryBarrier(); - object? workItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal); if (workItem == null) { - if (s_assignableWorkItemQueueCount > 0) - { - workQueue.UnassignWorkItemQueue(tl); - } - // Missing a steal means there may be an item that we were unable to get. - // Effectively, we failed to fulfill our promise to check the queues after - // clearing "Scheduled" flag. + // Effectively, we failed to fulfill our promise to check the queues for work. // We need to make sure someone will do another pass. if (missedSteal) { - workQueue.EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } // Tell the VM we're returning normally, not because Hill Climbing asked us to return. @@ -972,7 +897,7 @@ internal static bool Dispatch() // In a worst case the current workitem will indirectly depend on progress of other // items and that would lead to a deadlock if no one else checks the queue. // We must ensure at least one more worker is coming if the queue is not empty. - workQueue.EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); // // After this point, this method is no longer responsible for ensuring thread requests except for missed steals @@ -1019,7 +944,7 @@ internal static bool Dispatch() // if (missedSteal) { - workQueue.EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } return true; @@ -1089,7 +1014,7 @@ internal static bool Dispatch() // The quantum expired, do any necessary periodic activities - if (ThreadPool.YieldFromDispatchLoop) + if (ThreadPool.YieldFromDispatchLoop(currentTickCount)) { // The runtime-specific thread pool implementation requires the Dispatch loop to return to the VM // periodically to let it perform its own work @@ -1105,6 +1030,10 @@ internal static bool Dispatch() { // Due to hill climbing, over time arbitrary worker threads may stop working and eventually unbalance the // queue assignments. Periodically try to reassign a queue to keep the assigned queues busy. + // + // This can also be the first time the queue is assigned. + // We do not assign eagerly at the beginning of Dispatch as we would need to take _queueAssignmentLock + // and that lock may cause massive contentions if many threads start dispatching. workQueue.TryReassignWorkItemQueue(tl); } @@ -1179,6 +1108,7 @@ internal sealed class ThreadPoolWorkQueueThreadLocals public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) { assignedGlobalWorkItemQueue = tpq.workItems; + queueIndex = -1; workQueue = tpq; workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue(); ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/WindowsThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/WindowsThreadPool.cs index 25fa8cda1ad3e6..341dadb705db28 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/WindowsThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/WindowsThreadPool.cs @@ -27,6 +27,29 @@ internal static class WindowsThreadPool private static IntPtr s_work; + [StructLayout(LayoutKind.Sequential)] + private struct CacheLineSeparated + { + private readonly Internal.PaddingFor32 pad1; + + // This flag is used for communication between item enqueuing and workers that process the items. + // There are two states of this flag: + // 0: has no guarantees + // 1: means a worker will check work queues and ensure that + // any work items inserted in work queue before setting the flag + // are picked up. + // Note: The state must be cleared by the worker thread _before_ + // checking. Otherwise there is a window between finding no work + // and resetting the flag, when the flag is in a wrong state. + // A new work item may be added right before the flag is reset + // without asking for a worker, while the last worker is quitting. + public int _hasOutstandingThreadRequest; + + private readonly Internal.PaddingFor32 pad2; + } + + private static CacheLineSeparated _separated; + private sealed class ThreadCountHolder { internal ThreadCountHolder() => Interlocked.Increment(ref s_threadCount); @@ -147,6 +170,10 @@ private static void DispatchCallback(IntPtr instance, IntPtr context, IntPtr wor var wrapper = ThreadPoolCallbackWrapper.Enter(); Debug.Assert(s_work == work); + // Before looking for work items, acknowledge that the thread request has been satisfied + _separated._hasOutstandingThreadRequest = 0; + // NOTE: the thread request must be cleared before doing Dispatch. + // the following Interlocked.Increment will guarantee the ordering. Interlocked.Increment(ref s_workingThreadCounter.Count); ThreadPoolWorkQueue.Dispatch(); Interlocked.Decrement(ref s_workingThreadCounter.Count); @@ -155,7 +182,17 @@ private static void DispatchCallback(IntPtr instance, IntPtr context, IntPtr wor wrapper.Exit(resetThread: false); } - internal static unsafe void RequestWorkerThread() + internal static void EnsureWorkerRequested() + { + // Only one worker is requested at a time to mitigate Thundering Herd problem. + if (_separated._hasOutstandingThreadRequest == 0 && + Interlocked.Exchange(ref _separated._hasOutstandingThreadRequest, 1) == 0) + { + RequestWorkerThread(); + } + } + + private static unsafe void RequestWorkerThread() { if (s_work == IntPtr.Zero) {