diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a0f0fcb..ae1e152 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,8 @@ jobs: steps: - uses: actions/checkout@v2.3.4 - uses: olafurpg/setup-scala@v11 - - run: sbt ++2.13.5 docs/docusaurusCreateSite + - uses: coursier/cache-action@v5 + - run: sbt ++2.13.6 docs/docusaurusCreateSite test: name: ${{ matrix.command }} ${{ matrix.java }} @@ -25,22 +26,14 @@ jobs: matrix: java: ['adopt@1.8', 'adopt@1.11'] command: - - "++2.13.5 ci" + - "++2.13.6 ci" steps: - uses: actions/checkout@v2.3.4 - uses: olafurpg/setup-scala@v11 with: java-version: ${{ matrix.java }} - - name: Cache SBT coursier cache - uses: actions/cache@v2.1.6 - with: - path: ~/.cache/coursier/v1 - key: sbt-coursier-cache - - name: Cache SBT - uses: actions/cache@v2.1.6 - with: - path: ~/.sbt - key: sbt-${{ hashFiles('**/build.sbt') }} + - name: Cache + uses: coursier/cache-action@v6 - run: sbt ${{ matrix.command }} - uses: codecov/codecov-action@v1 if: startsWith(matrix.command, '++2.13') diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 400e3ab..aa09ac9 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -19,6 +19,6 @@ jobs: SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} - name: Publish docs - run: sbt ++2.13.5 docs/docusaurusPublishGhpages + run: sbt ++2.13.6 docs/docusaurusPublishGhpages env: GITHUB_DEPLOY_KEY: ${{ secrets.GITHUB_DEPLOY_KEY }} diff --git a/build.sbt b/build.sbt index 3cf0ec7..ba0a495 100644 --- a/build.sbt +++ b/build.sbt @@ -68,6 +68,7 @@ lazy val commonSettings = Seq( scalaVersion := Versions.scala_213, crossScalaVersions := Seq(scalaVersion.value), Test / fork := true, + Test / parallelExecution := false, Compile / compile / wartremoverErrors ++= Warts.allBut(Wart.Any, Wart.Nothing), // false positive addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.13.0" cross CrossVersion.full), addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1") diff --git a/core/src/test/scala/io/fmq/ContextSuite.scala b/core/src/test/scala/io/fmq/ContextSuite.scala index 5733b8a..021d901 100644 --- a/core/src/test/scala/io/fmq/ContextSuite.scala +++ b/core/src/test/scala/io/fmq/ContextSuite.scala @@ -1,8 +1,12 @@ package io.fmq +import java.util.concurrent.Executors + import cats.effect.{IO, Resource} import weaver.IOSuite +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} + trait ContextSuite extends IOSuite { override type Res = Context[IO] @@ -10,4 +14,9 @@ trait ContextSuite extends IOSuite { override def sharedResource: Resource[IO, Context[IO]] = Context.create[IO](1) + protected final def singleThreadExecutionContext: Resource[IO, ExecutionContextExecutor] = + Resource + .make(IO.delay(Executors.newSingleThreadExecutor()))(ec => IO.blocking(ec.shutdown())) + .map(ExecutionContext.fromExecutor) + } diff --git a/core/src/test/scala/io/fmq/proxy/ProxySuite.scala b/core/src/test/scala/io/fmq/proxy/ProxySuite.scala index 7deca6f..1b97092 100644 --- a/core/src/test/scala/io/fmq/proxy/ProxySuite.scala +++ b/core/src/test/scala/io/fmq/proxy/ProxySuite.scala @@ -1,8 +1,5 @@ package io.fmq.proxy -/* -import java.util.concurrent.Executors - import cats.effect.{IO, Resource} import cats.syntax.flatMap._ import io.fmq.ContextSuite @@ -15,16 +12,13 @@ import io.fmq.socket.reqrep.{Dealer, Reply, Request, Router} import io.fmq.syntax.literals._ import weaver.Expectations -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ /** - * Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem. - * More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver - */ -object ProxySuite extends ContextSuite { - - private def singleThreadContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) + * Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem. + * More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver + */ +object ProxySuite1 extends ContextSuite { test("proxy messages in bidirectional way") { ctx => val frontendUri = inproc"://frontend" @@ -54,11 +48,16 @@ object ProxySuite extends ContextSuite { (for { (front, back) <- createProxySockets proxy <- ctx.proxy.bidirectional(front, back) - _ <- proxy.start(singleThreadContext) + singleThreadEc <- singleThreadExecutionContext + _ <- proxy.start(singleThreadEc) (client, server) <- createReqRepSockets } yield (client, server)).use((program _).tupled) } +} + +object ProxySuite2 extends ContextSuite { + test("proxy message in unidirectional way") { ctx => val frontendUri = inproc"://frontend1" val backendUri = inproc"://backend1" @@ -83,10 +82,15 @@ object ProxySuite extends ContextSuite { push <- Resource.suspend(ctx.createPush.map(_.connect(controlUri))) control <- Resource.pure[IO, Control[IO]](Control.push(push)) proxy <- ctx.proxy.unidirectional(subscriberProxy, publisherProxy, Some(control)) - _ <- proxy.start(singleThreadContext) + singleThreadEc <- singleThreadExecutionContext + _ <- proxy.start(singleThreadEc) } yield (publisher, subscriber, pull)).use((program _).tupled) } +} + +object ProxySuite3 extends ContextSuite { + test("control socket observed messages") { ctx => val frontendUri = inproc"://frontend2" val backendUri = inproc"://backend2" @@ -131,11 +135,16 @@ object ProxySuite extends ContextSuite { (pull, push) <- createControlSockets control <- Resource.pure[IO, Control[IO]](Control.push(push)) proxy <- ctx.proxy.bidirectional(front, back, Some(control), Some(control)) - _ <- proxy.start(singleThreadContext) + singleThreadEc <- singleThreadExecutionContext + _ <- proxy.start(singleThreadEc) (client, server) <- createReqRepSockets } yield (client, server, pull)).use((program _).tupled) } +} + +object ProxySuite4 extends ContextSuite { + test("separate control sockets observed messages") { ctx => val frontendUri = inproc"://frontend3" val backendUri = inproc"://backend3" @@ -188,11 +197,16 @@ object ProxySuite extends ContextSuite { controlIn <- Resource.pure[IO, Control[IO]](Control.push(pushIn)) controlOut <- Resource.pure[IO, Control[IO]](Control.push(pushOut)) proxy <- ctx.proxy.bidirectional(front, back, Some(controlIn), Some(controlOut)) - _ <- proxy.start(singleThreadContext) + singleThreadEc <- singleThreadExecutionContext + _ <- proxy.start(singleThreadEc) (client, server) <- createReqRepSockets } yield (client, server, pullIn, pullOut)).use((program _).tupled) } +} + +object ProxySuite5 extends ContextSuite { + test("start new proxy after termination") { ctx => val frontendUri = inproc"://frontend4" val backendUri = inproc"://backend4" @@ -221,7 +235,9 @@ object ProxySuite extends ContextSuite { } def program(proxy: Proxy.Configured[IO]): IO[Expectations] = - proxy.start(singleThreadContext).use(_ => verifyProxy) >> proxy.start(singleThreadContext).use(_ => verifyProxy) + singleThreadExecutionContext.use { singleThreadEc => + proxy.start(singleThreadEc).use(_ => verifyProxy) >> proxy.start(singleThreadEc).use(_ => verifyProxy) + } (for { (front, back) <- createProxySockets @@ -230,4 +246,3 @@ object ProxySuite extends ContextSuite { } } - */