Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions core/src/main/scala/feral/IOSetup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,38 @@

package feral

import cats.effect.unsafe.IORuntime
import cats.effect.kernel.Resource
import cats.data.OptionT
import cats.effect.IO
import cats.effect.kernel.Deferred
import cats.effect.kernel.{Deferred, Resource}
import cats.syntax.all._
import scala.annotation.nowarn

private[feral] trait IOSetup {

protected def runtime: IORuntime = IORuntime.global
private[feral] trait IOSetup[Context] {

protected type Setup
protected def setup: Resource[IO, Setup] = Resource.pure(null.asInstanceOf[Setup])

private[feral] final lazy val setupMemo: IO[Setup] = {
val deferred = Deferred.unsafe[IO, Either[Throwable, Setup]]
setup
.attempt
.allocated
.flatTap {
case (setup, _) =>
deferred.complete(setup)
}
.unsafeRunAndForget()(runtime)
deferred.get.rethrow
}
protected def setup(@nowarn ctx: Context): Resource[IO, Setup] =
Resource.pure(null.asInstanceOf[Setup])

private[this] final lazy val deferred = Deferred.unsafe[IO, Either[Throwable, Setup]]

private[feral] final def setupMemo(ctx: Context): IO[Setup] =
OptionT(deferred.tryGet)
.getOrElseF(
setup(ctx).attempt.allocated.flatMap {
case (setup, _) =>
deferred.complete(setup).as(setup)
}
)
.rethrow

}

private[feral] object IOSetup {
private[feral] def apply[Context, Result](setupRes: Context => Resource[IO, Result])(
ctx: Context): IO[Result] =
new IOSetup[Context] {
override protected type Setup = Result

override protected def setup(ctx: Context): Resource[IO, Setup] = setupRes(ctx)
}.setupMemo(ctx)
}
6 changes: 3 additions & 3 deletions lambda/js/src/main/scala/feral/lambda/IOLambdaPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.scalajs.js.JSConverters._
import scala.scalajs.js.|

private[lambda] trait IOLambdaPlatform[Event, Result] {
this: IOLambda[Event, Result] =>
this: IOLambdaSetup[Event, Result] =>

final def main(args: Array[String]): Unit =
js.Dynamic.global.exports.updateDynamic(handlerName)(handlerFn)
Expand All @@ -35,9 +35,9 @@ private[lambda] trait IOLambdaPlatform[Event, Result] {
: js.Function2[js.Any, facade.Context, js.Promise[js.Any | Unit]] = {
(event: js.Any, context: facade.Context) =>
(for {
lambda <- setupMemo
event <- IO.fromEither(decodeJs[Event](event))
result <- lambda(event, Context.fromJS(context))
ctx = Context.fromJS[IO](context)
result <- setupAndRun(event, ctx)
} yield result.map(_.asJsAny).orUndefined).unsafeToPromise()(runtime)
}
}
5 changes: 2 additions & 3 deletions lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.io.OutputStream
import java.io.OutputStreamWriter

private[lambda] abstract class IOLambdaPlatform[Event, Result]
extends lambdaRuntime.RequestStreamHandler { this: IOLambda[Event, Result] =>
extends lambdaRuntime.RequestStreamHandler { this: IOLambdaSetup[Event, Result] =>

final def handleRequest(
input: InputStream,
Expand All @@ -38,7 +38,6 @@ private[lambda] abstract class IOLambdaPlatform[Event, Result]
Resource
.eval {
for {
lambda <- setupMemo
event <- fs2
.io
.readInputStream(IO.pure(input), 8192, closeAfterUse = false)
Expand All @@ -48,7 +47,7 @@ private[lambda] abstract class IOLambdaPlatform[Event, Result]
.compile
.lastOrError
context <- IO(Context.fromJava[IO](context))
_ <- OptionT(lambda(event, context)).foreachF { result =>
_ <- OptionT(setupAndRun(event, context)).foreachF { result =>
Resource.fromAutoCloseable(IO(new OutputStreamWriter(output))).use { writer =>
IO.blocking(Printer.noSpaces.unsafePrintToAppendable(result.asJson, writer))
}
Expand Down
9 changes: 6 additions & 3 deletions lambda/shared/src/main/scala/feral/lambda/IOLambda.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ abstract class IOLambda[Event, Result](
implicit private[lambda] val decoder: Decoder[Event],
private[lambda] val encoder: Encoder[Result]
) extends IOLambdaPlatform[Event, Result]
with IOSetup {
with IOLambdaSetup[Event, Result]
with IOSetup[Unit] {

final type Setup = (Event, Context[IO]) => IO[Option[Result]]
final override protected def setup: Resource[IO, Setup] = for {
final override protected def setup(ctx: Unit): Resource[IO, Setup] = for {
handler <- handler
localEvent <- IOLocal[Event](null.asInstanceOf[Event]).toResource
localContext <- IOLocal[Context[IO]](null).toResource
env = LambdaEnv.ioLambdaEnv(localEvent, localContext)
result = handler(env)
} yield { localEvent.set(_) *> localContext.set(_) *> result }

final override def setupAndRun(ev: Event, ctx: Context[IO]): IO[Option[Result]] =
setupMemo(()).flatMap(_(ev, ctx))

def handler: Resource[IO, LambdaEnv[IO, Event] => IO[Option[Result]]]

}
Expand Down
31 changes: 31 additions & 0 deletions lambda/shared/src/main/scala/feral/lambda/IOLambdaSetup.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feral.lambda

import cats.effect.IO
import cats.effect.unsafe.IORuntime
import io.circe.{Decoder, Encoder}

private[lambda] trait IOLambdaSetup[Event, Result] {
protected def runtime: IORuntime = IORuntime.global

private[lambda] implicit val decoder: Decoder[Event]
private[lambda] implicit val encoder: Encoder[Result]
final type Setup = (Event, Context[IO]) => IO[Option[Result]]

private[lambda] def setupAndRun(ev: Event, ctx: Context[IO]): IO[Option[Result]]
}
75 changes: 75 additions & 0 deletions lambda/shared/src/main/scala/feral/lambda/TracedIOLambda.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feral
package lambda

import cats.effect.kernel.Resource
import cats.effect.syntax.all._
import cats.effect.{IO, IOLocal}
import io.circe.{Decoder, Encoder}
import natchez._

import java.net.URI

abstract class TracedIOLambda[Event, Result](entryPoint: IO[EntryPoint[IO]])(
implicit private[lambda] val decoder: Decoder[Event],
private[lambda] val encoder: Encoder[Result],
private[lambda] val KS: KernelSource[Event]
) extends IOLambdaPlatform[Event, Result]
with IOLambdaSetup[Event, Result]
with IOSetup[(Event, Context[IO])] {

final override protected def setup(ctx: (Event, Context[IO])): Resource[IO, Setup] = for {
localEvent <- IOLocal(ctx._1).toResource
localContext <- IOLocal(ctx._2).toResource
env = LambdaEnv.ioLambdaEnv(localEvent, localContext)
trace <- Trace.ioTrace(defaultSpan(env)).toResource
setupResult = IOSetup(handler.tupled)(env -> trace)
} yield {
localEvent.set(_) *> localContext.set(_) *> trace.span("")(setupResult.flatten)
}

final override def setupAndRun(ev: Event, ctx: Context[IO]): IO[Option[Result]] =
setupMemo(ev -> ctx).flatMap(_(ev, ctx))

private[this] def defaultSpan(env: LambdaEnv[IO, Event]): Span[IO] =
new Span[IO] {
def put(fields: (String, TraceValue)*): IO[Unit] = IO.unit

def kernel: IO[Kernel] = env.event.map(KS.extract)

def span(name: String): Resource[IO, Span[IO]] = for {
ctx <- env.context.toResource
k <- kernel.toResource
ep <- entryPoint.toResource
span <- ep.continueOrElseRoot(ctx.functionName, k).evalTap { span =>
span.put(
AwsTags.arn(ctx.invokedFunctionArn),
AwsTags.requestId(ctx.awsRequestId)
)
}
} yield span

def traceId: IO[Option[String]] = IO.none

def spanId: IO[Option[String]] = IO.none

def traceUri: IO[Option[URI]] = IO.none
}

def handler: (LambdaEnv[IO, Event], Trace[IO]) => Resource[IO, IO[Option[Result]]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ class IOLambdaSuite extends CatsEffectSuite {
.as(_.event.map(Some(_)) <* invokeCounter.getAndUpdate(_ + 1))
}
}
handler <- lambda.setupMemo
_ <- ('0' to 'z')
.map(_.toString)
.toList
.traverse(x => handler(x, mockContext).assertEquals(Some(x)))
.traverse(x => lambda.setupAndRun(x, mockContext).assertEquals(Some(x)))
_ <- allocationCounter.get.assertEquals(1)
_ <- invokeCounter.get.assertEquals(75)
} yield ()
Expand Down
125 changes: 125 additions & 0 deletions lambda/shared/src/test/scala/feral/lambda/TracedIOLambdaSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feral.lambda

import cats.effect.{IO, Resource}
import cats.effect.kernel.Ref
import cats.implicits._
import munit.CatsEffectSuite
import natchez._

import java.net.URI

class TracedIOLambdaSuite extends CatsEffectSuite {
case class MockCompleteSpan(
name: String,
fields: Map[String, TraceValue] = Map.empty,
children: Vector[MockCompleteSpan] = Vector.empty)

case class MockSpan(
name: String,
fields: Ref[IO, Map[String, TraceValue]],
children: Ref[IO, Vector[MockCompleteSpan]])
extends Span[IO] {
override def put(addFields: (String, TraceValue)*): IO[Unit] = fields.update(_ ++ addFields)

override def kernel: IO[Kernel] = IO.pure(Kernel(Map.empty))

override def span(name: String): Resource[IO, Span[IO]] =
Resource.make(MockSpan.create(name))(MockSpan.finalize(_, children))

override def traceId: IO[Option[String]] = IO.none

override def spanId: IO[Option[String]] = IO.none

override def traceUri: IO[Option[URI]] = IO.none

}

object MockSpan {
def create(name: String): IO[MockSpan] =
(IO.ref(Map.empty[String, TraceValue]), IO.ref(Vector.empty[MockCompleteSpan])).mapN {
case (fields, children) => new MockSpan(name, fields, children)
}

def finalize(span: MockSpan, appendTo: Ref[IO, Vector[MockCompleteSpan]]): IO[Unit] = for {
fields <- span.fields.get
children <- span.children.get
completeSpan = MockCompleteSpan(span.name, fields, children)
_ <- appendTo.update(_.appended(completeSpan))
} yield ()
}

case class MockEntryPoint(children: Ref[IO, Vector[MockCompleteSpan]])
extends EntryPoint[IO] {

override def root(name: String): Resource[IO, Span[IO]] =
continueOrElseRoot(name, Kernel(Map.empty))

override def continue(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
continueOrElseRoot(name, kernel)

override def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
Resource.make(MockSpan.create(name))(MockSpan.finalize(_, children))
}

trait MockResource {
def query(s: String): IO[Option[String]]
}
object MockResource {
def apply(trace: Trace[IO]): Resource[IO, MockResource] =
Resource.eval(trace.span("init")(IO(new MockResource {
override def query(s: String): IO[Option[String]] = trace.span("query")(IO.some(s))
})))
}

test("Trace produces correct spans") {
implicit val ks: KernelSource[String] = KernelSource.emptyKernelSource
for {
spansRecorder <- IO.ref(Vector.empty[MockCompleteSpan])
entryPoint = MockEntryPoint(spansRecorder)
tracedLambda <- IO(new TracedIOLambda[String, String](IO.pure(entryPoint)) {
def handler: (LambdaEnv[IO, String], Trace[IO]) => Resource[IO, IO[Option[String]]] = {
(env, trace) => MockResource(trace).map(r => env.event.flatMap(r.query))
}
})
_ <- ('0' to '2')
.map(_.toString)
.toList
.traverse(e => tracedLambda.setupAndRun(e, mockContext(e)).assertEquals(Some(e)))
_ <- spansRecorder
.get
.assertEquals(
Vector(
expectRootSpan("0", Vector(MockCompleteSpan("init"), MockCompleteSpan("query"))),
expectRootSpan("1", Vector(MockCompleteSpan("query"))),
expectRootSpan("2", Vector(MockCompleteSpan("query")))
))
} yield ()
}

def mockContext(reqId: String) =
new Context[IO]("funcName", "", "funcArn", 0, reqId, "", "", None, None, IO.never)

def expectRootSpan(
requestId: String,
children: Vector[MockCompleteSpan] = Vector.empty): MockCompleteSpan =
MockCompleteSpan(
"funcName",
Map("aws.arn" -> "funcArn", "aws.requestId" -> requestId),
children)
}