Skip to content

Commit e46c31a

Browse files
committed
Fix race condition in ScalaCompile by selectively interrupting futures with Thread.interrupt
Prior to this change there was a bug when ScalaCompile actions were used with dynamic execution. The bug would cause builds to fail with the error message "compiler mirror not found". It is my understanding this message typically indicates a problem loading the Scala Library jar during compilation. The error would always occur in the local multiplex worker's logs and not the remote logs. This commit fixes this error by not using Thread.interrupt when cancelling the future running the ScalaCompile action. My hypothesis on why this error occurs is as follows: imagine one of the classloaders in the shared ScalaInstance is in use on Thread A. That thread gets cancelled and interrupted via Thread.interrupt. This causes some kind of persistent error inside either the classloader or another part of Zinc or the Scala compiler. We correctly ignore the error on Thread A because the request it was handling was cancelled. Another thread is working a non-cancelled request, it uses that same ScalaInstance, hit that persistent error, and fails. By not using Thread.interrupt we don't trigger the persistent error and thus avoid the bug.
1 parent 908c877 commit e46c31a

File tree

17 files changed

+188
-112
lines changed

17 files changed

+188
-112
lines changed

rules/scala_proto/private/ScalaProtoWorker.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import higherkindness.rules_scala.common.args.implicits.*
66
import higherkindness.rules_scala.common.interrupt.InterruptUtil
77
import higherkindness.rules_scala.common.error.AnnexWorkerError
88
import higherkindness.rules_scala.common.sandbox.SandboxUtil
9-
import higherkindness.rules_scala.common.worker.WorkerMain
9+
import higherkindness.rules_scala.common.worker.{WorkerMain, WorkTask}
1010
import java.io.{File, PrintStream}
1111
import java.nio.file.{Files, Path, Paths}
1212
import java.util.Collections
@@ -72,9 +72,12 @@ object ScalaProtoWorker extends WorkerMain[Unit] {
7272

7373
override def init(args: Option[Array[String]]): Unit = ()
7474

75-
protected def work(ctx: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = {
76-
val workRequest = ScalaProtoRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
77-
InterruptUtil.throwIfInterrupted()
75+
protected def work(task: WorkTask[Unit]): Unit = {
76+
val workRequest = ScalaProtoRequest(
77+
task.workDir,
78+
ArgsUtil.parseArgsOrFailSafe(task.args, argParser, task.output),
79+
)
80+
InterruptUtil.throwIfInterrupted(task.isCancelled)
7881

7982
val scalaOut = workRequest.outputDir
8083
Files.createDirectories(scalaOut)
@@ -93,7 +96,7 @@ object ScalaProtoWorker extends WorkerMain[Unit] {
9396
}
9497
}
9598

96-
InterruptUtil.throwIfInterrupted()
99+
InterruptUtil.throwIfInterrupted(task.isCancelled)
97100
val exitCode = ProtocBridge.runWithGenerators(
98101
new MyProtocRunner,
99102
namedGenerators = List("scala" -> ScalaPbCodeGenerator),
@@ -102,7 +105,7 @@ object ScalaProtoWorker extends WorkerMain[Unit] {
102105
if (exitCode != 0) {
103106
throw new AnnexWorkerError(exitCode)
104107
}
105-
InterruptUtil.throwIfInterrupted()
108+
InterruptUtil.throwIfInterrupted(task.isCancelled)
106109
}
107110

108111
}

rules/scalafmt/scalafmt/ScalafmtRunner.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import higherkindness.rules_scala.common.args.ArgsUtil
44
import higherkindness.rules_scala.common.args.ArgsUtil.PathArgumentType
55
import higherkindness.rules_scala.common.interrupt.InterruptUtil
66
import higherkindness.rules_scala.common.sandbox.SandboxUtil
7-
import higherkindness.rules_scala.common.worker.WorkerMain
7+
import higherkindness.rules_scala.common.worker.{WorkTask, WorkerMain}
88
import higherkindness.rules_scala.workers.common.Color
99
import java.io.{File, PrintStream}
1010
import java.nio.file.{Files, Path}
@@ -45,16 +45,19 @@ object ScalafmtRunner extends WorkerMain[Unit] {
4545

4646
protected def init(args: Option[Array[String]]): Unit = {}
4747

48-
protected def work(worker: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = {
49-
val workRequest = ScalafmtRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
50-
InterruptUtil.throwIfInterrupted()
48+
protected def work(task: WorkTask[Unit]): Unit = {
49+
val workRequest = ScalafmtRequest(
50+
task.workDir,
51+
ArgsUtil.parseArgsOrFailSafe(task.args, argParser, task.output),
52+
)
53+
InterruptUtil.throwIfInterrupted(task.isCancelled)
5154

5255
val source = FileOps.readFile(workRequest.inputFile)(Codec.UTF8)
5356

5457
val config = ScalafmtConfig.fromHoconFile(workRequest.configFile).get
5558
@tailrec
5659
def format(code: String): String = {
57-
InterruptUtil.throwIfInterrupted()
60+
InterruptUtil.throwIfInterrupted(task.isCancelled)
5861
val formatted = Scalafmt.format(code, config).get
5962
if (code == formatted) code else format(formatted)
6063
}
@@ -76,7 +79,7 @@ object ScalafmtRunner extends WorkerMain[Unit] {
7679
}
7780

7881
Files.write(workRequest.outputFile, output.getBytes)
79-
InterruptUtil.throwIfInterrupted()
82+
InterruptUtil.throwIfInterrupted(task.isCancelled)
8083
}
8184

8285
}
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package higherkindness.rules_scala
22
package common.interrupt
33

4+
import java.util.concurrent.CancellationException
5+
46
object InterruptUtil {
5-
def throwIfInterrupted(): Unit = {
7+
def throwIfInterrupted(isCancelled: Function0[Boolean]): Unit = {
68
if (Thread.interrupted()) {
7-
throw new InterruptedException("WorkRequest was cancelled.")
9+
throw new InterruptedException("WorkRequest was cancelled via Thread interruption.")
10+
} else if (isCancelled()) {
11+
throw new CancellationException("WorkRequest was cancelled via FutureTask cancellation.")
812
}
913
}
1014
}

src/main/scala/higherkindness/rules_scala/common/worker/CancellableTask.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ import scala.util.Try
1616
* Heavily inspired by the following: https://github.com/NthPortal/cancellable-task/tree/master
1717
* https://stackoverflow.com/a/39986418/6442597
1818
*/
19-
class CancellableTask[S] private (fn: => S) {
19+
class CancellableTask[S] private (fn: Function1[Function0[Boolean], S]) {
2020
private val promise = Promise[S]()
2121
val future: Future[S] = promise.future
2222

2323
private val fnCallable = new Callable[S]() {
24-
def call(): S = fn
24+
def call(): S = fn(isCancelled)
2525
}
2626

2727
private val task = new FutureTaskWaitOnCancel[S](fnCallable) {
@@ -39,10 +39,16 @@ class CancellableTask[S] private (fn: => S) {
3939
def cancel(mayInterruptIfRunning: Boolean): Boolean = task.cancel(mayInterruptIfRunning)
4040

4141
def execute(executionContext: ExecutionContext): Unit = executionContext.execute(task)
42+
43+
def isCancelled(): Boolean = task.isCancelled()
4244
}
4345

4446
object CancellableTask {
4547
def apply[S](fn: => S): CancellableTask[S] = {
48+
new CancellableTask((_: Function0[Boolean]) => fn)
49+
}
50+
51+
def apply[S](fn: Function1[Function0[Boolean], S]): CancellableTask[S] = {
4652
new CancellableTask(fn)
4753
}
4854
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package higherkindness.rules_scala
2+
package common.worker
3+
4+
import java.io.PrintStream
5+
import java.nio.file.Path
6+
7+
case class WorkTask[S](
8+
context: S,
9+
args: Array[String],
10+
output: PrintStream,
11+
workDir: Path,
12+
verbosity: Int,
13+
isCancelled: Function0[Boolean],
14+
)

src/main/scala/higherkindness/rules_scala/common/worker/WorkerMain.scala

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package common.worker
44
import common.error.{AnnexDuplicateActiveRequestException, AnnexWorkerError}
55
import com.google.devtools.build.lib.worker.WorkerProtocol
66
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream, PrintStream}
7+
import java.nio.channels.ClosedByInterruptException
78
import java.nio.file.{Path, Paths}
89
import java.util.concurrent.{Callable, CancellationException, ConcurrentHashMap, ForkJoinPool, FutureTask}
910
import scala.annotation.tailrec
@@ -15,10 +16,27 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
1516

1617
protected def init(args: Option[Array[String]]): S
1718

18-
protected def work(ctx: S, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit
19-
19+
/**
20+
* isCancelled is used to determine whether the FutureTask that is executing this work request has been cancelled. If
21+
* it is safe to Thread.interrupt a process, then that is done and can be checked. Not all workers can be
22+
* Thread.interrupted safely.
23+
*
24+
* TODO(James): document the rest of this function
25+
*/
26+
protected def work(workRequest: WorkTask[S]): Unit
27+
28+
/**
29+
* Indicates whether this program is being executed as a worker or as a regular process. It is a var because we won't
30+
* know until runtime which one it is.
31+
*/
2032
protected var isWorker = false
2133

34+
/**
35+
* Used to determine whether to interrupt the FutureTasks being executed by this worker using Thread.interrupt or not.
36+
* It's safe to interrupt many things with Thread.interrupt, but not all things.
37+
*/
38+
protected val mayInterruptWorkerTasks = true
39+
2240
final def main(args: Array[String]): Unit = {
2341
args.toList match {
2442
case "--persistent_worker" :: args =>
@@ -113,7 +131,9 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
113131
Option(activeRequests.get(requestId)).foreach { activeRequest =>
114132
// Cancel will wait for the thread to complete or be interrupted, so we do it in a future
115133
// to prevent blocking the worker from processing more requests
116-
Future(activeRequest.cancel(mayInterruptIfRunning = true))(scala.concurrent.ExecutionContext.global)
134+
Future(activeRequest.cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))(
135+
scala.concurrent.ExecutionContext.global,
136+
)
117137
}
118138
} else {
119139
val args = request.getArgumentsList.toArray(Array.empty[String])
@@ -131,13 +151,13 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
131151
maybeOut.map(_.flush())
132152
}
133153

134-
val workTask = CancellableTask {
154+
def doWork(isCancelled: Function0[Boolean]) = {
135155
val outStream = new ByteArrayOutputStream()
136156
val out = new PrintStream(outStream)
137157
maybeOutStream = Some(outStream)
138158
maybeOut = Some(out)
139159
try {
140-
work(ctx, args, out, sandboxDir, verbosity)
160+
work(WorkTask(ctx, args, out, sandboxDir, verbosity, isCancelled))
141161
0
142162
} catch {
143163
case e @ AnnexWorkerError(code, _, _) =>
@@ -146,6 +166,8 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
146166
}
147167
}
148168

169+
val workTask = CancellableTask(doWork)
170+
149171
workTask.future
150172
.andThen {
151173
// Work task succeeded or failed in an expected way
@@ -178,7 +200,7 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
178200
}
179201

180202
// Task successfully cancelled
181-
case Failure(e: CancellationException) =>
203+
case Failure(e @ (_: CancellationException | _: ClosedByInterruptException)) =>
182204
flushOut()
183205
writeResponse(requestId, None, None, wasCancelled = true)
184206
logVerbose(
@@ -235,11 +257,14 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
235257
val returnCode =
236258
try {
237259
work(
238-
init(args = None),
239-
args.toArray,
240-
out,
241-
workDir = Path.of(""),
242-
verbosity = 0,
260+
WorkTask(
261+
init(args = None),
262+
args.toArray,
263+
out,
264+
workDir = Path.of(""),
265+
verbosity = 0,
266+
isCancelled = () => false,
267+
),
243268
)
244269

245270
0
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package higherkindness.rules_scala
22
package workers.bloop.compile
33

4-
import common.worker.WorkerMain
4+
import common.worker.{WorkTask, WorkerMain}
55

66
import bloop.Bloop
77
import java.io.PrintStream
88
import java.nio.file.Path
99

1010
object BloopRunner extends WorkerMain[Unit] {
1111
override def init(args: Option[Array[String]]): Unit = ()
12-
override def work(ctx: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = Bloop
12+
override def work(workRequest: WorkTask[Unit]): Unit = Bloop
1313
}

src/main/scala/higherkindness/rules_scala/workers/deps/DepsRunner.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import common.args.implicits.*
77
import common.error.AnnexWorkerError
88
import common.interrupt.InterruptUtil
99
import common.sandbox.SandboxUtil
10-
import common.worker.WorkerMain
10+
import common.worker.{WorkTask, WorkerMain}
1111
import workers.common.AnnexMapper
1212
import workers.common.FileUtil
1313
import java.io.{File, PrintStream}
@@ -111,9 +111,12 @@ object DepsRunner extends WorkerMain[Unit] {
111111

112112
override def init(args: Option[Array[String]]): Unit = ()
113113

114-
override def work(ctx: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = {
115-
val workRequest = DepsRunnerRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
116-
InterruptUtil.throwIfInterrupted()
114+
override def work(task: WorkTask[Unit]): Unit = {
115+
val workRequest = DepsRunnerRequest(
116+
task.workDir,
117+
ArgsUtil.parseArgsOrFailSafe(task.args, argParser, task.output),
118+
)
119+
InterruptUtil.throwIfInterrupted(task.isCancelled)
117120

118121
val groupLabelToJarPaths = workRequest.groups.map { group =>
119122
group.label -> group.jars
@@ -133,10 +136,10 @@ object DepsRunner extends WorkerMain[Unit] {
133136

134137
potentialLabels.collect(groupLabelToJarPaths).flatten
135138
}
136-
val readWriteMappers = AnnexMapper.mappers(workDir, isIncremental = false)
139+
val readWriteMappers = AnnexMapper.mappers(task.workDir, isIncremental = false)
137140
val readMapper = readWriteMappers.getReadMapper()
138141

139-
InterruptUtil.throwIfInterrupted()
142+
InterruptUtil.throwIfInterrupted(task.isCancelled)
140143
val usedPaths = Files
141144
.readAllLines(workRequest.usedDepsFile)
142145
.asScala
@@ -165,7 +168,7 @@ object DepsRunner extends WorkerMain[Unit] {
165168
Nil
166169
}
167170

168-
InterruptUtil.throwIfInterrupted()
171+
InterruptUtil.throwIfInterrupted(task.isCancelled)
169172
val labelsToAdd = if (workRequest.checkDirect) {
170173
(usedPaths -- (workRequest.directDepLabels :++ workRequest.unusedDepWhitelist).flatMap(pathsForLabel))
171174
.flatMap { path =>
@@ -198,6 +201,6 @@ object DepsRunner extends WorkerMain[Unit] {
198201
throw new AnnexWorkerError(1, errorMessage.result())
199202
}
200203

201-
InterruptUtil.throwIfInterrupted()
204+
InterruptUtil.throwIfInterrupted(task.isCancelled)
202205
}
203206
}

src/main/scala/higherkindness/rules_scala/workers/jacoco/instrumenter/JacocoInstrumenter.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import common.args.ArgsUtil
55
import common.error.AnnexWorkerError
66
import common.interrupt.InterruptUtil
77
import common.sandbox.SandboxUtil
8-
import common.worker.WorkerMain
8+
import common.worker.{WorkTask, WorkerMain}
99
import java.io.{BufferedInputStream, BufferedOutputStream, PrintStream}
1010
import java.net.URI
1111
import java.nio.file.Files
@@ -68,14 +68,17 @@ object JacocoInstrumenter extends WorkerMain[Unit] {
6868

6969
override def init(args: Option[Array[String]]): Unit = ()
7070

71-
override def work(ctx: Unit, args: Array[String], out: PrintStream, workDir: Path, verbosity: Int): Unit = {
72-
val workRequest = JacocoRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
71+
override def work(task: WorkTask[Unit]): Unit = {
72+
val workRequest = JacocoRequest(
73+
task.workDir,
74+
ArgsUtil.parseArgsOrFailSafe(task.args, argParser, task.output),
75+
)
7376

7477
val jacoco = new Instrumenter(new OfflineInstrumentationAccessGenerator)
7578

7679
workRequest.jars.foreach { case (inPath, outPath) =>
7780
Using.Manager { use =>
78-
InterruptUtil.throwIfInterrupted()
81+
InterruptUtil.throwIfInterrupted(task.isCancelled)
7982

8083
val inFS = use(FileSystems.newFileSystem(inPath, null: ClassLoader))
8184
val outFS =
@@ -107,6 +110,6 @@ object JacocoInstrumenter extends WorkerMain[Unit] {
107110
}.get
108111
}
109112

110-
InterruptUtil.throwIfInterrupted()
113+
InterruptUtil.throwIfInterrupted(task.isCancelled)
111114
}
112115
}

0 commit comments

Comments
 (0)