Skip to content
This repository was archived by the owner on Nov 5, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ jobs:
steps:
- uses: actions/checkout@v2.3.4
- uses: olafurpg/setup-scala@v11
- run: sbt ++2.13.5 docs/docusaurusCreateSite
- uses: coursier/cache-action@v5
- run: sbt ++2.13.6 docs/docusaurusCreateSite

test:
name: ${{ matrix.command }} ${{ matrix.java }}
Expand All @@ -25,22 +26,14 @@ jobs:
matrix:
java: ['adopt@1.8', 'adopt@1.11']
command:
- "++2.13.5 ci"
- "++2.13.6 ci"
steps:
- uses: actions/checkout@v2.3.4
- uses: olafurpg/setup-scala@v11
with:
java-version: ${{ matrix.java }}
- name: Cache SBT coursier cache
uses: actions/cache@v2.1.6
with:
path: ~/.cache/coursier/v1
key: sbt-coursier-cache
- name: Cache SBT
uses: actions/cache@v2.1.6
with:
path: ~/.sbt
key: sbt-${{ hashFiles('**/build.sbt') }}
- name: Cache
uses: coursier/cache-action@v6
- run: sbt ${{ matrix.command }}
- uses: codecov/codecov-action@v1
if: startsWith(matrix.command, '++2.13')
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ jobs:
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
- name: Publish docs
run: sbt ++2.13.5 docs/docusaurusPublishGhpages
run: sbt ++2.13.6 docs/docusaurusPublishGhpages
env:
GITHUB_DEPLOY_KEY: ${{ secrets.GITHUB_DEPLOY_KEY }}
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ lazy val commonSettings = Seq(
scalaVersion := Versions.scala_213,
crossScalaVersions := Seq(scalaVersion.value),
Test / fork := true,
Test / parallelExecution := false,
Compile / compile / wartremoverErrors ++= Warts.allBut(Wart.Any, Wart.Nothing), // false positive
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.13.0" cross CrossVersion.full),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
Expand Down
9 changes: 9 additions & 0 deletions core/src/test/scala/io/fmq/ContextSuite.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package io.fmq

import java.util.concurrent.Executors

import cats.effect.{IO, Resource}
import weaver.IOSuite

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}

trait ContextSuite extends IOSuite {

override type Res = Context[IO]

override def sharedResource: Resource[IO, Context[IO]] =
Context.create[IO](1)

protected final def singleThreadExecutionContext: Resource[IO, ExecutionContextExecutor] =
Resource
.make(IO.delay(Executors.newSingleThreadExecutor()))(ec => IO.blocking(ec.shutdown()))
.map(ExecutionContext.fromExecutor)

}
47 changes: 31 additions & 16 deletions core/src/test/scala/io/fmq/proxy/ProxySuite.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package io.fmq.proxy

/*
import java.util.concurrent.Executors

import cats.effect.{IO, Resource}
import cats.syntax.flatMap._
import io.fmq.ContextSuite
Expand All @@ -15,16 +12,13 @@ import io.fmq.socket.reqrep.{Dealer, Reply, Request, Router}
import io.fmq.syntax.literals._
import weaver.Expectations

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

/**
* Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem.
* More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver
*/
object ProxySuite extends ContextSuite {

private def singleThreadContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
* Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem.
* More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver
*/
object ProxySuite1 extends ContextSuite {

test("proxy messages in bidirectional way") { ctx =>
val frontendUri = inproc"://frontend"
Expand Down Expand Up @@ -54,11 +48,16 @@ object ProxySuite extends ContextSuite {
(for {
(front, back) <- createProxySockets
proxy <- ctx.proxy.bidirectional(front, back)
_ <- proxy.start(singleThreadContext)
singleThreadEc <- singleThreadExecutionContext
_ <- proxy.start(singleThreadEc)
(client, server) <- createReqRepSockets
} yield (client, server)).use((program _).tupled)
}

}

object ProxySuite2 extends ContextSuite {

test("proxy message in unidirectional way") { ctx =>
val frontendUri = inproc"://frontend1"
val backendUri = inproc"://backend1"
Expand All @@ -83,10 +82,15 @@ object ProxySuite extends ContextSuite {
push <- Resource.suspend(ctx.createPush.map(_.connect(controlUri)))
control <- Resource.pure[IO, Control[IO]](Control.push(push))
proxy <- ctx.proxy.unidirectional(subscriberProxy, publisherProxy, Some(control))
_ <- proxy.start(singleThreadContext)
singleThreadEc <- singleThreadExecutionContext
_ <- proxy.start(singleThreadEc)
} yield (publisher, subscriber, pull)).use((program _).tupled)
}

}

object ProxySuite3 extends ContextSuite {

test("control socket observed messages") { ctx =>
val frontendUri = inproc"://frontend2"
val backendUri = inproc"://backend2"
Expand Down Expand Up @@ -131,11 +135,16 @@ object ProxySuite extends ContextSuite {
(pull, push) <- createControlSockets
control <- Resource.pure[IO, Control[IO]](Control.push(push))
proxy <- ctx.proxy.bidirectional(front, back, Some(control), Some(control))
_ <- proxy.start(singleThreadContext)
singleThreadEc <- singleThreadExecutionContext
_ <- proxy.start(singleThreadEc)
(client, server) <- createReqRepSockets
} yield (client, server, pull)).use((program _).tupled)
}

}

object ProxySuite4 extends ContextSuite {

test("separate control sockets observed messages") { ctx =>
val frontendUri = inproc"://frontend3"
val backendUri = inproc"://backend3"
Expand Down Expand Up @@ -188,11 +197,16 @@ object ProxySuite extends ContextSuite {
controlIn <- Resource.pure[IO, Control[IO]](Control.push(pushIn))
controlOut <- Resource.pure[IO, Control[IO]](Control.push(pushOut))
proxy <- ctx.proxy.bidirectional(front, back, Some(controlIn), Some(controlOut))
_ <- proxy.start(singleThreadContext)
singleThreadEc <- singleThreadExecutionContext
_ <- proxy.start(singleThreadEc)
(client, server) <- createReqRepSockets
} yield (client, server, pullIn, pullOut)).use((program _).tupled)
}

}

object ProxySuite5 extends ContextSuite {

test("start new proxy after termination") { ctx =>
val frontendUri = inproc"://frontend4"
val backendUri = inproc"://backend4"
Expand Down Expand Up @@ -221,7 +235,9 @@ object ProxySuite extends ContextSuite {
}

def program(proxy: Proxy.Configured[IO]): IO[Expectations] =
proxy.start(singleThreadContext).use(_ => verifyProxy) >> proxy.start(singleThreadContext).use(_ => verifyProxy)
singleThreadExecutionContext.use { singleThreadEc =>
proxy.start(singleThreadEc).use(_ => verifyProxy) >> proxy.start(singleThreadEc).use(_ => verifyProxy)
}

(for {
(front, back) <- createProxySockets
Expand All @@ -230,4 +246,3 @@ object ProxySuite extends ContextSuite {
}

}
*/