Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bench/taskscheduler/BenchTaskSchedulerTemplate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ namespace bench {
uint32_t aTaskCount,
uint32_t aExecuteCount)
{
TaskScheduler sched("bench", aThreadCount, 5000);
TaskScheduler sched("bench", 5000);
sched.Start(aThreadCount);
sched.Reserve(aTaskCount);
BenchTaskCtl ctl(aTaskCount, aExecuteCount, &sched);
ctl.Warmup();
Expand Down
22 changes: 16 additions & 6 deletions bench/taskscheduler/BenchTaskSchedulerTrivial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ namespace bench {
public:
TaskScheduler(
const char* aName,
uint32_t aThreadCount,
uint32_t aSubQueueSize);

~TaskScheduler();

void Start(
uint32_t aThreadCount);

void Post(
Task* aTask);

Expand All @@ -85,6 +87,7 @@ namespace bench {
bool myIsStopped;
TaskList myQueue;
std::vector<TaskSchedulerThread*> myWorkers;
const std::string myName;

friend TaskSchedulerThread;
};
Expand Down Expand Up @@ -146,13 +149,10 @@ namespace bench {

TaskScheduler::TaskScheduler(
const char* aName,
uint32_t aThreadCount,
uint32_t /*aSubQueueSize*/)
: myIsStopped(false)
: myIsStopped(true)
, myName(aName)
{
myWorkers.resize(aThreadCount);
for (TaskSchedulerThread*& w : myWorkers)
w = new TaskSchedulerThread(aName, this);
}

TaskScheduler::~TaskScheduler()
Expand All @@ -165,6 +165,16 @@ namespace bench {
delete w;
}

void
TaskScheduler::Start(
uint32_t aThreadCount)
{
myIsStopped = false;
myWorkers.resize(aThreadCount);
for (TaskSchedulerThread*& w : myWorkers)
w = new TaskSchedulerThread(myName.c_str(), this);
}

void
TaskScheduler::Post(
Task* aTask)
Expand Down
2 changes: 1 addition & 1 deletion examples/iocore_03_pipeline/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ ServeRequests(
// the requests, create MyRequest for each, and submit them to the scheduler. But in
// this case it is simplified to just a couple of hardcoded MyRequests.
mg::sch::TaskScheduler scheduler("tst",
1, // Thread count.
5 // Subqueue size.
);
scheduler.Start(1);
MG_LOG_INFO("ServeRequests", "got a couple of complex requests");
new MyRequest(1, aClient, scheduler);
new MyRequest(2, aClient, scheduler);
Expand Down
2 changes: 1 addition & 1 deletion examples/scheduler_01_simple_task/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ int
main()
{
mg::sch::TaskScheduler sched("tst",
1, // Thread count.
5 // Subqueue size.
);
sched.Start(1);
sched.Post(new mg::sch::Task([&](mg::sch::Task *self) {
std::cout << "Executed in scheduler!\n";
delete self;
Expand Down
2 changes: 1 addition & 1 deletion examples/scheduler_02_coroutine_task/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ main()
// Normally one would allocate tasks on the heap and make them delete themselves when
// they are finished.
mg::sch::TaskScheduler scheduler("tst",
1, // Thread count.
5 // Subqueue size.
);
scheduler.Start(1);
scheduler.Post(&task);
return 0;
}
2 changes: 1 addition & 1 deletion examples/scheduler_03_multistep_task/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ main()
// Normally one would allocate tasks on the heap and make them delete themselves when
// they are finished.
mg::sch::TaskScheduler scheduler("tst",
1, // Thread count.
5 // Subqueue size.
);
scheduler.Start(1);
scheduler.Post(&task);
return 0;
}
2 changes: 1 addition & 1 deletion examples/scheduler_04_interacting_tasks/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ main()
// Normally one would allocate tasks on the heap and make them delete themselves when
// they are finished.
mg::sch::TaskScheduler scheduler("tst",
1, // Thread count.
5 // Subqueue size.
);
scheduler.Start(1);

task.SetCallback([](
mg::sch::Task& aSelf) -> mg::box::Coro {
Expand Down
9 changes: 7 additions & 2 deletions src/mg/aio/IOTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ namespace aio {
// immediately noticed by the scheduler (if the task was already in the front
// queue) and would be actually closed + deleted in some worker thread even before
// this function ends.
IOTaskStatus oldStatus = myStatus.ExchangeRelaxed(IOTASK_STATUS_CLOSING);
//
// Use 'release' to give the other threads a way to sync the writes done before
// closing, such as the close-guard setting.
IOTaskStatus oldStatus = myStatus.ExchangeRelease(IOTASK_STATUS_CLOSING);
MG_BOX_ASSERT_F(
oldStatus == IOTASK_STATUS_PENDING ||
oldStatus == IOTASK_STATUS_READY ||
Expand Down Expand Up @@ -260,10 +263,12 @@ namespace aio {
void
IOTask::PrivCloseDo()
{
// Use 'acquire' to sync all the writes done by the thread which has initiated the
// closing. Such as the close-guard setting.
MG_BOX_ASSERT(myStatus.LoadAcquire() == IOTASK_STATUS_CLOSING);
MG_BOX_ASSERT(myCloseGuard.LoadRelaxed());
MG_BOX_ASSERT(!myIsClosed);
MG_BOX_ASSERT(myNext == nullptr);
MG_BOX_ASSERT(myStatus.LoadRelaxed() == IOTASK_STATUS_CLOSING);
// Closed flag is ok to update non-atomically. Close is done in the scheduler, so
// the task is not executed in any worker now and they can't see this flag before
// the task's socket is finally removed from the kernel.
Expand Down
52 changes: 52 additions & 0 deletions src/mg/box/Atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,58 @@ namespace box {
T DecrementFetch()
{ return myValue.fetch_sub(1, std::memory_order_seq_cst) - 1; }

T FetchBitOrRelaxed(
const T& aValue)
{ return myValue.fetch_or(aValue, std::memory_order_relaxed); }
T FetchBitOrAcquire(
const T& aValue)
{ return myValue.fetch_or(aValue, std::memory_order_acquire); }
T FetchBitOrRelease(
const T& aValue)
{ return myValue.fetch_or(aValue, std::memory_order_release); }
T FetchBitOr(
const T& aValue)
{ return myValue.fetch_or(aValue, std::memory_order_seq_cst); }

void BitOrRelaxed(
const T& aValue)
{ FetchBitOrRelaxed(aValue); }
void BitOrAcquire(
const T& aValue)
{ FetchBitOrAcquire(aValue); }
void BitOrRelease(
const T& aValue)
{ FetchBitOrRelease(aValue); }
void BitOr(
const T& aValue)
{ FetchBitOr(aValue); }

T FetchBitAndRelaxed(
const T& aValue)
{ return myValue.fetch_and(aValue, std::memory_order_relaxed); }
T FetchBitAndAcquire(
const T& aValue)
{ return myValue.fetch_and(aValue, std::memory_order_acquire); }
T FetchBitAndRelease(
const T& aValue)
{ return myValue.fetch_and(aValue, std::memory_order_release); }
T FetchBitAnd(
const T& aValue)
{ return myValue.fetch_and(aValue, std::memory_order_seq_cst); }

void BitAndRelaxed(
const T& aValue)
{ FetchBitAndRelaxed(aValue); }
void BitAndAcquire(
const T& aValue)
{ FetchBitAndAcquire(aValue); }
void BitAndRelease(
const T& aValue)
{ FetchBitAndRelease(aValue); }
void BitAnd(
const T& aValue)
{ FetchBitAnd(aValue); }

bool CmpExchgWeakRelaxed(
T& aExpected,
const T& aValue);
Expand Down
2 changes: 2 additions & 0 deletions src/mg/box/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ set(mgbox_src
ConditionVariable.cpp
Coro.cpp
Error.cpp
InterruptibleMutex.cpp
IOVec.cpp
Log.cpp
MultiConsumerQueueBase.cpp
Expand Down Expand Up @@ -50,6 +51,7 @@ set(install_headers
DoublyList.h
Error.h
ForwardList.h
InterruptibleMutex.h
IOVec.h
Log.h
MultiConsumerQueue.h
Expand Down
127 changes: 127 additions & 0 deletions src/mg/box/InterruptibleMutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#include "InterruptibleMutex.h"

#include "mg/box/Thread.h"

namespace mg {
namespace box {
namespace {
enum InterruptibleMutexFlags
{
INTERRUPTIBLE_MUTEX_TAKEN = 0x1,
INTERRUPTIBLE_MUTEX_HAS_WAITERS = 0x2,
};
}
InterruptibleMutex::InterruptibleMutex()
: myState(0)
{
}

InterruptibleMutex::~InterruptibleMutex()
{
MG_BOX_ASSERT(myState.LoadRelaxed() == 0);
}

void
InterruptibleMutex::Lock(
const InterruptibleMutexWakeupCallback& aWakeup)
{
if (TryLock())
return;

// The mutex is owned by somebody. Try to take it over.
myMutex.Lock();
// Pessimistically this thread might have to wait, so lets add the corresponding
// flag. Use 'acquire' to sync all the writes done by the mutex's current owner.
// In case the owner is already leaving.
uint8_t expected = myState.FetchBitOrAcquire(
INTERRUPTIBLE_MUTEX_HAS_WAITERS);
if ((expected & INTERRUPTIBLE_MUTEX_TAKEN) == 0)
{
// Could happen than between trying to lock and taking the internal mutex the
// original owner has already left. If it actually left the ownership
// entirely, it means there were no waiters.
MG_BOX_ASSERT(myWaiters.IsEmpty());
// And no other thread could take the lock, because it would see the 'waiters'
// flag and would also try to take the internal mutex, which is owner by this
// thread atm. Hence it is safe to just take the ownership.
// 'Relaxed' should be enough, because acquire was already done just above.
expected = myState.ExchangeRelaxed(INTERRUPTIBLE_MUTEX_TAKEN);
// The waiters flag was set by this thread, but not anymore. New waiters, if
// any are waiting on the internal mutex, will just set this flag again if
// needed.
MG_BOX_ASSERT(expected == INTERRUPTIBLE_MUTEX_HAS_WAITERS);
myMutex.Unlock();
return;
}
// Some other thread owns the lock and this thread has announced its presence via
// the waiters-flag. Now time to wait until the owner will hand the ownership
// over.
InterruptibleMutexWaiter self;
if (myWaiters.IsEmpty())
{
// Only the first waiter wakes the owner up. Makes no sense to wake it
// multiple times.
aWakeup();
}
myWaiters.Append(&self);
myMutex.Unlock();

// Just wait for the ownership to be handed over.
self.mySignal.ReceiveBlocking();
// Use 'acquire' to sync all the writes done by the previous owner.
MG_BOX_ASSERT((myState.LoadAcquire() & INTERRUPTIBLE_MUTEX_TAKEN) != 0);
}

bool
InterruptibleMutex::TryLock()
{
// It is important that the lock can't be taken if there are already waiters. No
// thread can take the ownership out of order. Otherwise this might cause
// starvation of the waiters.
// Use 'acquire' to sync all the writes done by the previous owner.
uint8_t expected = 0;
return myState.CmpExchgStrongAcquire(expected, INTERRUPTIBLE_MUTEX_TAKEN);
}

void
InterruptibleMutex::Unlock()
{
// The happy-path is when there are no waiters. Then just unlock it and leave.
// Use 'release' to provide a sync point for the next owners to sync all the
// writes done by this thread.
uint8_t expected = INTERRUPTIBLE_MUTEX_TAKEN;
if (myState.CmpExchgStrongRelease(expected, 0))
return;

// So there were some waiters spotted. Try to hand the ownership directly to the
// first one.
MG_BOX_ASSERT(expected ==
(INTERRUPTIBLE_MUTEX_TAKEN | INTERRUPTIBLE_MUTEX_HAS_WAITERS));

myMutex.Lock();
MG_BOX_ASSERT(!myWaiters.IsEmpty());
// It is important to firstly pop the waiter, and only then remove the
// waiters-flag (if there was only one waiter). Otherwise the following might
// have happened:
// - Thread-1 takes the lock.
// - Thread-2 becomes a waiter and adds the waiter-flag.
// - Thread-1 frees the lock and (wrongly) signals the thread-2 that the ownership
// is given.
// - Thread-2 wakes up and tries to free the lock too.
// - Thread-2 sees that the waiters-flag is set, but there are no waiters. Broken
// assumption.
InterruptibleMutexWaiter* first = myWaiters.PopFirst();
// Even when nothing to do - still use 'release'. To give the next owner a way to
// sync all the writes done by this thread.
if (myWaiters.IsEmpty())
expected = myState.FetchBitAndRelease(~INTERRUPTIBLE_MUTEX_HAS_WAITERS);
else
expected = myState.FetchBitAndRelease(-1);
MG_BOX_ASSERT((expected & INTERRUPTIBLE_MUTEX_TAKEN) != 0);
MG_BOX_ASSERT((expected & INTERRUPTIBLE_MUTEX_HAS_WAITERS) != 0);
first->mySignal.Send();
myMutex.Unlock();
}

}
}
Loading