@@ -77,6 +77,13 @@ class CalcClient final
7777 std::function<void (int64_t )>&& aOnComplete)
7878 {
7979 MG_LOG_INFO (" Client.Submit" , " new request" );
80+ // This function can be called by any thread in our code. Not only by IOCore
81+ // worker serving this client's socket right now. Need to be careful with thread
82+ // safety here.
83+ //
84+ // The solution we can do is have a thread-safe queue of requests submitted to the
85+ // client. When the queue gets populated, we wakeup the client to pop the queue
86+ // and handle the requests.
8087 CalcRequest* req = new CalcRequest ();
8188 req->myOp = aOp;
8289 req->myArg1 = aArg1;
@@ -119,12 +126,14 @@ class CalcClient final
119126 {
120127 CalcRequest* tail;
121128 CalcRequest* head = myFrontQueue.PopAll (tail);
129+ // Check if spurious wakeup.
122130 if (head == nullptr )
123131 return ;
124132
125- // Save the requests for later response handling.
133+ // Move the requests from the thread-safe front queue to the local plain simple
134+ // list. For later response handling.
126135 myQueue.Append (head, tail);
127- // Send requests in bulk.
136+ // Send requests in bulk, in one stream of bytes .
128137 mg::net::BufferStream data;
129138 while (head != nullptr )
130139 {
@@ -181,9 +190,14 @@ class CalcClient final
181190 }
182191
183192 mg::aio::TCPSocket* mySock;
184- // Queue of sent requests waiting for their responses.
193+
194+ // Queue of sent requests waiting for their responses. It is not thread-safe, just a
195+ // normal list. But it is only accessed inside client's socket context. Hence never is
196+ // touched by more than one thread. No need for a mutex.
185197 mg::box::ForwardList<CalcRequest> myQueue;
186- // Queue if requests submitted by other threads. Not yet sent to the network.
198+
199+ // Queue if requests submitted by other threads. Not yet sent to the network. It is
200+ // thread-safe.
187201 mg::box::MultiProducerQueueIntrusive<CalcRequest> myFrontQueue;
188202};
189203
@@ -360,7 +374,8 @@ class MyRequest
360374 aSelf->myTask .PostSignal ();
361375 });
362376 // Wait for the signal in a yielding loop, to handle potential spurious wakeups.
363- while (!co_await aSelf->myTask .AsyncReceiveSignal ());
377+ while (!co_await aSelf->myTask .AsyncReceiveSignal ())
378+ aSelf->myTask .SetWait ();
364379 //
365380 MG_LOG_INFO (" MyRequest.Execute" , " %d: got response %lld" , myID, (long long )res);
366381
@@ -371,7 +386,8 @@ class MyRequest
371386 res = aRes;
372387 aSelf->myTask .PostSignal ();
373388 });
374- while (!co_await aSelf->myTask .AsyncReceiveSignal ());
389+ while (!co_await aSelf->myTask .AsyncReceiveSignal ())
390+ aSelf->myTask .SetWait ();
375391 MG_LOG_INFO (" MyRequest.Execute" , " %d: got response %lld" , myID, (long long )res);
376392
377393 // 'delete this' + co_return wouldn't work here. Because deletion of the self
0 commit comments