Skip to content

Commit 797fd83

Browse files
authored
Merge pull request #78 from AVSystem/rpc-rest
RPC based REST
2 parents 2f53e60 + 5f9211b commit 797fd83

File tree

68 files changed

+3730
-674
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+3730
-674
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
language: scala
22

33
scala:
4-
- 2.11.11
4+
- 2.11.12
55
- 2.12.6
66

77
jdk:

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
derivation for case classes and sealed hierarchies
1515
* [RPC framework](docs/RPCFramework.md): **typesafe** RPC/proxy framework used in particular by
1616
[Udash Framework](http://guide.udash.io/#/rpc) for client-server communication
17+
* [REST framework](docs/REST.md) based on RPC framework
1718
* Better enumeration support for Scala -
1819
[`ValueEnum`](http://avsystem.github.io/scala-commons/api/com/avsystem/commons/misc/ValueEnum.html),
1920
[`SealedEnumCompanion`](http://avsystem.github.io/scala-commons/api/com/avsystem/commons/misc/SealedEnumCompanion.html),

build.sbt

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ cancelable in Global := true
99
val forIdeaImport = System.getProperty("idea.managed", "false").toBoolean && System.getProperty("idea.runid") == null
1010

1111
// for binary compatibility checking
12-
val previousVersion = "1.27.3"
12+
val previousCompatibleVersions = Set.empty[String]
1313

1414
val silencerVersion = "1.1"
1515
val guavaVersion = "23.0"
@@ -100,8 +100,8 @@ val jvmCommonSettings = commonSettings ++ Seq(
100100
libraryDependencies ++= Seq(
101101
"org.apache.commons" % "commons-io" % commonsIoVersion % Test,
102102
),
103-
mimaPreviousArtifacts := {
104-
Set(organization.value % s"${name.value}_${scalaBinaryVersion.value}" % previousVersion)
103+
mimaPreviousArtifacts := previousCompatibleVersions.map { previousVersion =>
104+
organization.value % s"${name.value}_${scalaBinaryVersion.value}" % previousVersion
105105
},
106106
)
107107

@@ -192,7 +192,7 @@ lazy val `commons-macros` = project.settings(
192192
)
193193

194194
lazy val `commons-core` = project
195-
.dependsOn(`commons-macros`, `commons-annotations`)
195+
.dependsOn(`commons-macros`, `commons-annotations` % CompileAndTest)
196196
.settings(
197197
jvmCommonSettings,
198198
sourceDirsSettings(_ / "jvm"),
@@ -206,7 +206,7 @@ lazy val `commons-core` = project
206206
lazy val `commons-core-js` = project.in(`commons-core`.base / "js")
207207
.enablePlugins(ScalaJSPlugin)
208208
.configure(p => if (forIdeaImport) p.dependsOn(`commons-core`) else p)
209-
.dependsOn(`commons-macros`, `commons-annotations-js`)
209+
.dependsOn(`commons-macros`, `commons-annotations-js` % CompileAndTest)
210210
.settings(
211211
jsCommonSettings,
212212
name := (name in `commons-core`).value,
@@ -229,13 +229,16 @@ lazy val `commons-analyzer` = project
229229
)
230230

231231
lazy val `commons-jetty` = project
232-
.dependsOn(`commons-core`)
232+
.dependsOn(`commons-core` % CompileAndTest)
233233
.settings(
234234
jvmCommonSettings,
235235
libraryDependencies ++= Seq(
236236
"org.eclipse.jetty" % "jetty-client" % jettyVersion,
237237
"org.eclipse.jetty" % "jetty-server" % jettyVersion,
238238
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion,
239+
240+
"org.eclipse.jetty" % "jetty-servlet" % jettyVersion % Test,
241+
"org.slf4j" % "slf4j-simple" % "1.7.25" % Test,
239242
),
240243
)
241244

@@ -283,7 +286,7 @@ lazy val `commons-benchmark-js` = project.in(`commons-benchmark`.base / "js")
283286
)
284287

285288
lazy val `commons-mongo` = project
286-
.dependsOn(`commons-core`)
289+
.dependsOn(`commons-core` % CompileAndTest)
287290
.settings(
288291
jvmCommonSettings,
289292
libraryDependencies ++= Seq(
@@ -297,7 +300,7 @@ lazy val `commons-mongo` = project
297300
)
298301

299302
lazy val `commons-kafka` = project
300-
.dependsOn(`commons-core`)
303+
.dependsOn(`commons-core` % CompileAndTest)
301304
.settings(
302305
jvmCommonSettings,
303306
libraryDependencies ++= Seq(
@@ -306,7 +309,7 @@ lazy val `commons-kafka` = project
306309
)
307310

308311
lazy val `commons-redis` = project
309-
.dependsOn(`commons-core`)
312+
.dependsOn(`commons-core` % CompileAndTest)
310313
.settings(
311314
jvmCommonSettings,
312315
libraryDependencies ++= Seq(
@@ -318,7 +321,7 @@ lazy val `commons-redis` = project
318321
)
319322

320323
lazy val `commons-spring` = project
321-
.dependsOn(`commons-core`)
324+
.dependsOn(`commons-core` % CompileAndTest)
322325
.settings(
323326
jvmCommonSettings,
324327
libraryDependencies ++= Seq(
@@ -328,7 +331,7 @@ lazy val `commons-spring` = project
328331
)
329332

330333
lazy val `commons-akka` = project
331-
.dependsOn(`commons-core`)
334+
.dependsOn(`commons-core` % CompileAndTest)
332335
.settings(
333336
jvmCommonSettings,
334337
libraryDependencies ++= Seq(

commons-akka/src/main/scala/com/avsystem/commons/rpc/akka/MonixRPCFramework.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ trait MonixRPCFramework extends RPCFramework {
88
override type RawRPC <: MonixRawRPC
99

1010
trait MonixRawRPC { this: RawRPC =>
11-
@multi def observe(rpcName: String)(@multi args: List[RawValue]): Observable[RawValue]
11+
@multi def observe(@composite invocation: RawInvocation): Observable[RawValue]
1212
}
1313

1414
implicit def readerBasedObservableAsReal[T: Reader]: AsReal[Observable[RawValue], Observable[T]] =

commons-akka/src/main/scala/com/avsystem/commons/rpc/akka/RemoteMessage.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.avsystem.commons
22
package rpc.akka
33

44
import akka.util.ByteString
5-
import com.avsystem.commons.rpc.akka.AkkaRPCFramework.RawValue
5+
import com.avsystem.commons.rpc.akka.AkkaRPCFramework._
66
import com.avsystem.commons.serialization.GenCodec
77

88
/**
@@ -11,8 +11,8 @@ import com.avsystem.commons.serialization.GenCodec
1111
private sealed trait RemoteMessage extends Serializable
1212

1313
private object RemoteMessage {
14-
implicit val byteStringCodec = GenCodec.create[ByteString](input => ByteString(input.readBinary()), (output, byteString) => output.writeBinary(byteString.toArray))
15-
implicit val rawInvocationCodec = GenCodec.materialize[RawInvocation]
14+
implicit val byteStringCodec: GenCodec[ByteString] =
15+
GenCodec.create[ByteString](input => ByteString(input.readBinary()), (output, byteString) => output.writeBinary(byteString.toArray))
1616

1717
implicit val procedureInvocationMessageCodec: GenCodec[ProcedureInvocationMessage] = GenCodec.materialize[ProcedureInvocationMessage]
1818
implicit val functionInvocationMessageCodec: GenCodec[FunctionInvocationMessage] = GenCodec.materialize[FunctionInvocationMessage]
@@ -28,16 +28,13 @@ private object RemoteMessage {
2828
implicit val heatBeatCodec: GenCodec[MonixProtocol.Heartbeat.type] = GenCodec.materialize[MonixProtocol.Heartbeat.type]
2929
}
3030

31-
private final case class RawInvocation(rpcName: String, args: List[RawValue]) extends RemoteMessage
32-
3331
private sealed trait InvocationMessage extends RemoteMessage {
3432
def getterChain: Seq[RawInvocation]
35-
def name: String
36-
def args: List[RawValue]
33+
def invocation: RawInvocation
3734
}
38-
private final case class ProcedureInvocationMessage(name: String, args: List[RawValue], getterChain: Seq[RawInvocation]) extends InvocationMessage
39-
private final case class FunctionInvocationMessage(name: String, args: List[RawValue], getterChain: Seq[RawInvocation]) extends InvocationMessage
40-
private final case class ObservableInvocationMessage(name: String, args: List[RawValue], getterChain: Seq[RawInvocation]) extends InvocationMessage
35+
private final case class ProcedureInvocationMessage(invocation: RawInvocation, getterChain: Seq[RawInvocation]) extends InvocationMessage
36+
private final case class FunctionInvocationMessage(invocation: RawInvocation, getterChain: Seq[RawInvocation]) extends InvocationMessage
37+
private final case class ObservableInvocationMessage(invocation: RawInvocation, getterChain: Seq[RawInvocation]) extends InvocationMessage
4138

4239
private sealed trait InvocationResult extends RemoteMessage
4340
private final case class InvocationSuccess(value: RawValue) extends InvocationResult
@@ -52,4 +49,4 @@ private object MonixProtocol {
5249
case object StreamCompleted extends RemoteMessage
5350

5451
case object Heartbeat extends RemoteMessage
55-
}
52+
}

commons-akka/src/main/scala/com/avsystem/commons/rpc/akka/client/ClientRawRPC.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ package rpc.akka.client
44
import akka.actor.ActorSystem
55
import akka.pattern.ask
66
import akka.util.Timeout
7-
import com.avsystem.commons.rpc.akka.AkkaRPCFramework.{RawRPC, RawValue}
7+
import com.avsystem.commons.rpc.akka.AkkaRPCFramework._
88
import com.avsystem.commons.rpc.akka._
99
import monix.execution.Cancelable
1010
import monix.reactive.{Observable, OverflowStrategy}
@@ -14,12 +14,12 @@ import monix.reactive.{Observable, OverflowStrategy}
1414
*/
1515
private[akka] final class ClientRawRPC(config: AkkaRPCClientConfig, getterChain: Seq[RawInvocation] = Nil)(implicit system: ActorSystem) extends AkkaRPCFramework.RawRPC {
1616

17-
override def fire(rpcName: String)(args: List[RawValue]): Unit = {
18-
system.actorSelection(config.serverPath) ! ProcedureInvocationMessage(rpcName, args, getterChain)
17+
override def fire(invocation: RawInvocation): Unit = {
18+
system.actorSelection(config.serverPath) ! ProcedureInvocationMessage(invocation, getterChain)
1919
}
20-
override def call(rpcName: String)(args: List[RawValue]): Future[RawValue] = {
20+
override def call(invocation: RawInvocation): Future[RawValue] = {
2121
implicit val timeout: Timeout = Timeout(config.functionCallTimeout)
22-
val future = system.actorSelection(config.serverPath) ? FunctionInvocationMessage(rpcName, args, getterChain)
22+
val future = system.actorSelection(config.serverPath) ? FunctionInvocationMessage(invocation, getterChain)
2323

2424
import com.avsystem.commons.concurrent.RunNowEC.Implicits.executionContext
2525

@@ -29,13 +29,13 @@ private[akka] final class ClientRawRPC(config: AkkaRPCClientConfig, getterChain:
2929
case value => Future.failed(new IllegalStateException(s"Illegal message type. Should be InvocationResult, but received value was: $value"))
3030
}
3131
}
32-
override def get(rpcName: String)(args: List[RawValue]): RawRPC =
33-
new ClientRawRPC(config, getterChain :+ RawInvocation(rpcName, args))
32+
override def get(invocation: RawInvocation): RawRPC =
33+
new ClientRawRPC(config, getterChain :+ invocation)
3434

35-
override def observe(rpcName: String)(args: List[RawValue]): Observable[RawValue] = {
35+
override def observe(invocation: RawInvocation): Observable[RawValue] = {
3636
Observable.create(OverflowStrategy.Unbounded) { s =>
3737
val actor = system.actorOf(MonixClientActor.props(s, config))
38-
actor ! ObservableInvocationMessage(rpcName, args, getterChain)
38+
actor ! ObservableInvocationMessage(invocation, getterChain)
3939
Cancelable.empty // TODO implement proper canceling
4040
}
4141

commons-akka/src/main/scala/com/avsystem/commons/rpc/akka/serialization/RemoteMessageSerializer.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ final class RemoteMessageSerializer extends Serializer {
2626
case c if c == classOf[ProcedureInvocationMessage] => GenCodec.read[ProcedureInvocationMessage](input)
2727
case c if c == classOf[FunctionInvocationMessage] => GenCodec.read[FunctionInvocationMessage](input)
2828
case c if c == classOf[ObservableInvocationMessage] => GenCodec.read[ObservableInvocationMessage](input)
29-
case c if c == classOf[RawInvocation] => GenCodec.read[RawInvocation](input)
3029
case c if c == classOf[InvocationSuccess] => GenCodec.read[InvocationSuccess](input)
3130
case c if c == classOf[InvocationFailure] => GenCodec.read[InvocationFailure](input)
3231
case c if c == MonixProtocol.Continue.getClass => GenCodec.read[MonixProtocol.Continue.type](input)
@@ -46,7 +45,6 @@ final class RemoteMessageSerializer extends Serializer {
4645
case m: ProcedureInvocationMessage => GenCodec.write(output, m)
4746
case m: FunctionInvocationMessage => GenCodec.write(output, m)
4847
case m: ObservableInvocationMessage => GenCodec.write(output, m)
49-
case m: RawInvocation => GenCodec.write(output, m)
5048
case m: InvocationSuccess => GenCodec.write(output, m)
5149
case m: InvocationFailure => GenCodec.write(output, m)
5250
case MonixProtocol.Continue => GenCodec.write(output, MonixProtocol.Continue)

commons-akka/src/main/scala/com/avsystem/commons/rpc/akka/server/ServerActor.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,28 @@ import akka.actor.{Actor, ActorLogging, Props}
55
import akka.pattern.{AskTimeoutException, ask}
66
import akka.util.Timeout
77
import com.avsystem.commons.concurrent.RunNowEC
8+
import com.avsystem.commons.rpc.akka.AkkaRPCFramework._
89
import com.avsystem.commons.rpc.akka._
910
import monix.execution.{Ack, Scheduler}
1011
import monix.reactive.Observable
1112

1213
/**
1314
* @author Wojciech Milewski
1415
*/
15-
private final class ServerActor(rawRPC: AkkaRPCFramework.RawRPC, config: AkkaRPCServerConfig) extends Actor with ActorLogging {
16+
private final class ServerActor(rawRPC: RawRPC, config: AkkaRPCServerConfig) extends Actor with ActorLogging {
1617

1718
override def receive: Receive = {
18-
case msg@ProcedureInvocationMessage(name, argLists, getterChain) =>
19-
resolveRpc(msg).fire(name)(argLists)
20-
case msg@FunctionInvocationMessage(name, argLists, getterChain) =>
19+
case ProcedureInvocationMessage(invocation, getterChain) =>
20+
resolveRpc(getterChain).fire(invocation)
21+
case FunctionInvocationMessage(invocation, getterChain) =>
2122
val s = sender()
22-
resolveRpc(msg).call(name)(argLists).onCompleteNow {
23+
resolveRpc(getterChain).call(invocation).onCompleteNow {
2324
case Success(value) => s ! InvocationSuccess(value)
2425
case Failure(e) =>
25-
logError(e, name)
26+
logError(e, invocation.rpcName)
2627
s ! InvocationFailure(e.getClass.getCanonicalName, e.getMessage)
2728
}
28-
case msg@ObservableInvocationMessage(name, argLists, getterChain) =>
29+
case ObservableInvocationMessage(invocation, getterChain) =>
2930
implicit val scheduler: Scheduler = Scheduler(RunNowEC)
3031
implicit val timeout: Timeout = Timeout(config.observableAckTimeout)
3132
val s = sender()
@@ -36,7 +37,7 @@ private final class ServerActor(rawRPC: AkkaRPCFramework.RawRPC, config: AkkaRPC
3637
Ack.Continue
3738
}
3839

39-
resolveRpc(msg).observe(name)(argLists).subscribe(
40+
resolveRpc(getterChain).observe(invocation).subscribe(
4041
value => {
4142
val result = s ? InvocationSuccess(value)
4243
result.mapTo[MonixProtocol.RemoteAck].map {
@@ -53,7 +54,7 @@ private final class ServerActor(rawRPC: AkkaRPCFramework.RawRPC, config: AkkaRPC
5354
},
5455
e => {
5556
heartbeat.cancel()
56-
logError(e, name)
57+
logError(e, invocation.rpcName)
5758
s ! InvocationFailure(e.getClass.getCanonicalName, e.getMessage)
5859
},
5960
() => {
@@ -63,8 +64,8 @@ private final class ServerActor(rawRPC: AkkaRPCFramework.RawRPC, config: AkkaRPC
6364
)
6465
}
6566

66-
private def resolveRpc(msg: InvocationMessage) =
67-
rawRPC.resolveGetterChain(msg.getterChain.map(r => AkkaRPCFramework.RawInvocation(r.rpcName, r.args)).toList)
67+
private def resolveRpc(getterChain: Seq[RawInvocation]): RawRPC =
68+
rawRPC.resolveGetterChain(getterChain)
6869

6970
private def logError(e: Throwable, methodName: String): Unit = {
7071
log.error(e,
@@ -76,5 +77,5 @@ private final class ServerActor(rawRPC: AkkaRPCFramework.RawRPC, config: AkkaRPC
7677
}
7778

7879
private[akka] object ServerActor {
79-
def props(rawRPC: AkkaRPCFramework.RawRPC, config: AkkaRPCServerConfig): Props = Props(new ServerActor(rawRPC, config))
80+
def props(rawRPC: RawRPC, config: AkkaRPCServerConfig): Props = Props(new ServerActor(rawRPC, config))
8081
}

commons-akka/src/test/scala/com/avsystem/commons/rpc/akka/AkkaRPCFrameworkTest.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@ import scala.concurrent.duration._
1414
* @author Wojciech Milewski
1515
*/
1616
abstract class AkkaRPCFrameworkTest(
17-
serverSystem: ActorSystem,
18-
clientSystem: ActorSystem,
19-
serverSystemPath: Option[String] = None)
20-
extends FlatSpec with RPCFrameworkTest with ProcedureRPCTest with FunctionRPCTest with GetterRPCTest with ObservableRPCTest with BeforeAndAfterAll {
17+
serverSystem: ActorSystem, clientSystem: ActorSystem, serverSystemPath: Option[String] = None)
18+
extends FlatSpec with RPCFrameworkTest with ProcedureRPCTest with FunctionRPCTest with GetterRPCTest with ObservableRPCTest
19+
with BeforeAndAfterAll {
2120

2221
/**
2322
* Servers as identifier supplier for each test case to allow tests parallelization.

commons-akka/src/test/scala/com/avsystem/commons/rpc/akka/RPCFrameworkTest.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ trait RPCFrameworkTest extends FlatSpecLike with Matchers with MockitoSugar with
2121

2222
import RPCFrameworkTest._
2323

24-
val callTimeout = 200.millis
24+
val callTimeout: FiniteDuration = 200.millis
2525

26-
override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = Span.convertDurationToSpan(3.seconds))
26+
override implicit val patienceConfig: PatienceConfig =
27+
PatienceConfig(timeout = Span.convertDurationToSpan(10.seconds))
2728

2829
/**
2930
* Run tests with connection between client and server.

0 commit comments

Comments
 (0)