@@ -61,14 +61,24 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
6161 val ec = ExecutionContext .fromExecutor(fjp)
6262
6363 // Map of request id to the runnable responsible for executing that request id
64- val activeRequests = new ConcurrentHashMap [Int , CancellableTask [Int ]](poolSize)
64+ val activeRequests = new ConcurrentHashMap [Int , ( WorkerProtocol . WorkRequest , CancellableTask [Int ]) ](poolSize)
6565
6666 def writeResponse (
6767 requestId : Int ,
6868 maybeOutStream : Option [OutputStream ],
6969 maybeExitCode : Option [Int ],
7070 wasCancelled : Boolean = false ,
7171 ): Unit = {
72+ // Remove the request from our book keeping right before we respond to Bazel. If
73+ // we respond to Bazel about the request before removing it,then there is a race:
74+ // Bazel could make a request with the same requestId to this worker before the
75+ // requestId is removed from the worker's book keeping.
76+ //
77+ // Ideally Bazel will not send a request to this worker with the same requestId
78+ // as another request before we've responded to the original request. If that
79+ // happens, then there's a race regardless of what we do.
80+ activeRequests.remove(requestId)
81+
7282 // Defined here so all writes to stdout are synchronized
7383 stdout.synchronized {
7484 val builder = WorkerProtocol .WorkResponse .newBuilder
@@ -88,8 +98,6 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
8898 .build()
8999 .writeDelimitedTo(stdout)
90100 }
91-
92- activeRequests.remove(requestId)
93101 }
94102
95103 /**
@@ -128,10 +136,10 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
128136
129137 // From the Bazel doc: "The server may send cancel requests for requests that the worker
130138 // has already responded to, in which case the cancel request must be ignored."
131- Option (activeRequests.get(requestId)).foreach { activeRequest =>
139+ Option (activeRequests.get(requestId)).foreach { case (_, workTask) =>
132140 // Cancel will wait for the thread to complete or be interrupted, so we do it in a future
133141 // to prevent blocking the worker from processing more requests
134- Future (activeRequest .cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))(
142+ Future (workTask .cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))(
135143 scala.concurrent.ExecutionContext .global,
136144 )
137145 }
@@ -227,9 +235,14 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
227235 // for this requestId. If that's the case, we have a book keeping error or there are
228236 // two active requests with the same ID. Either of which is not good and something we
229237 // should just crash on.
230- if (activeRequests.putIfAbsent(requestId, workTask) != null ) {
238+ val alreadyActiveRequest = activeRequests.putIfAbsent(requestId, (request, workTask))
239+ if (alreadyActiveRequest != null ) {
240+ val (activeRequest, _) = alreadyActiveRequest
231241 throw new AnnexDuplicateActiveRequestException (
232- s " Received a WorkRequest with an already active request id: ${requestId}" ,
242+ s """ Received a WorkRequest with an already active request id: ${requestId}.
243+ Currently active request: ${activeRequest.toString}
244+ New request with the same id: ${request.toString}
245+ """ ,
233246 )
234247 } else {
235248 workTask.execute(ec)
0 commit comments