From 20bf91d6365e7551ddd75e3102e018e52e035bdd Mon Sep 17 00:00:00 2001 From: Tadas Dailyda Date: Thu, 7 Jul 2022 16:32:47 +0300 Subject: [PATCH 1/2] Initial implementation of TracedIOLambda and related changes --- core/src/main/scala/feral/IOSetup.scala | 47 ++++--- .../scala/feral/lambda/IOLambdaPlatform.scala | 6 +- .../scala/feral/lambda/IOLambdaPlatform.scala | 5 +- .../main/scala/feral/lambda/IOLambda.scala | 9 +- .../scala/feral/lambda/IOLambdaSetup.scala | 31 +++++ .../scala/feral/lambda/TracedIOLambda.scala | 75 +++++++++++ .../scala/feral/lambda/IOLambdaSuite.scala | 3 +- .../feral/lambda/TracedIOLambdaSuite.scala | 125 ++++++++++++++++++ 8 files changed, 270 insertions(+), 31 deletions(-) create mode 100644 lambda/shared/src/main/scala/feral/lambda/IOLambdaSetup.scala create mode 100644 lambda/shared/src/main/scala/feral/lambda/TracedIOLambda.scala create mode 100644 lambda/shared/src/test/scala/feral/lambda/TracedIOLambdaSuite.scala diff --git a/core/src/main/scala/feral/IOSetup.scala b/core/src/main/scala/feral/IOSetup.scala index eda4dbe8..df6c1b72 100644 --- a/core/src/main/scala/feral/IOSetup.scala +++ b/core/src/main/scala/feral/IOSetup.scala @@ -16,30 +16,37 @@ package feral -import cats.effect.unsafe.IORuntime -import cats.effect.kernel.Resource +import cats.data.OptionT import cats.effect.IO -import cats.effect.kernel.Deferred +import cats.effect.kernel.{Deferred, Resource} import cats.syntax.all._ -private[feral] trait IOSetup { - - protected def runtime: IORuntime = IORuntime.global +private[feral] trait IOSetup[Context] { protected type Setup - protected def setup: Resource[IO, Setup] = Resource.pure(null.asInstanceOf[Setup]) - - private[feral] final lazy val setupMemo: IO[Setup] = { - val deferred = Deferred.unsafe[IO, Either[Throwable, Setup]] - setup - .attempt - .allocated - .flatTap { - case (setup, _) => - deferred.complete(setup) - } - .unsafeRunAndForget()(runtime) - deferred.get.rethrow - } + protected def setup(ctx: Context): Resource[IO, Setup] = + Resource.pure(null.asInstanceOf[Setup]) + + private[this] final lazy val deferred = Deferred.unsafe[IO, Either[Throwable, Setup]] + + private[feral] final def setupMemo(ctx: Context): IO[Setup] = + OptionT(deferred.tryGet) + .getOrElseF( + setup(ctx).attempt.allocated.flatMap { + case (setup, _) => + deferred.complete(setup).as(setup) + } + ) + .rethrow + +} + +private[feral] object IOSetup { + private[feral] def apply[Context, Result](setupRes: Context => Resource[IO, Result])( + ctx: Context): IO[Result] = + new IOSetup[Context] { + override protected type Setup = Result + override protected def setup(ctx: Context): Resource[IO, Setup] = setupRes(ctx) + }.setupMemo(ctx) } diff --git a/lambda/js/src/main/scala/feral/lambda/IOLambdaPlatform.scala b/lambda/js/src/main/scala/feral/lambda/IOLambdaPlatform.scala index 08bf57e8..dfa10fa9 100644 --- a/lambda/js/src/main/scala/feral/lambda/IOLambdaPlatform.scala +++ b/lambda/js/src/main/scala/feral/lambda/IOLambdaPlatform.scala @@ -24,7 +24,7 @@ import scala.scalajs.js.JSConverters._ import scala.scalajs.js.| private[lambda] trait IOLambdaPlatform[Event, Result] { - this: IOLambda[Event, Result] => + this: IOLambdaSetup[Event, Result] => final def main(args: Array[String]): Unit = js.Dynamic.global.exports.updateDynamic(handlerName)(handlerFn) @@ -35,9 +35,9 @@ private[lambda] trait IOLambdaPlatform[Event, Result] { : js.Function2[js.Any, facade.Context, js.Promise[js.Any | Unit]] = { (event: js.Any, context: facade.Context) => (for { - lambda <- setupMemo event <- IO.fromEither(decodeJs[Event](event)) - result <- lambda(event, Context.fromJS(context)) + ctx = Context.fromJS[IO](context) + result <- setupAndRun(event, ctx) } yield result.map(_.asJsAny).orUndefined).unsafeToPromise()(runtime) } } diff --git a/lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala b/lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala index 884f36ee..d758a934 100644 --- a/lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala +++ b/lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala @@ -29,7 +29,7 @@ import java.io.OutputStream import java.io.OutputStreamWriter private[lambda] abstract class IOLambdaPlatform[Event, Result] - extends lambdaRuntime.RequestStreamHandler { this: IOLambda[Event, Result] => + extends lambdaRuntime.RequestStreamHandler { this: IOLambdaSetup[Event, Result] => final def handleRequest( input: InputStream, @@ -38,7 +38,6 @@ private[lambda] abstract class IOLambdaPlatform[Event, Result] Resource .eval { for { - lambda <- setupMemo event <- fs2 .io .readInputStream(IO.pure(input), 8192, closeAfterUse = false) @@ -48,7 +47,7 @@ private[lambda] abstract class IOLambdaPlatform[Event, Result] .compile .lastOrError context <- IO(Context.fromJava[IO](context)) - _ <- OptionT(lambda(event, context)).foreachF { result => + _ <- OptionT(setupAndRun(event, context)).foreachF { result => Resource.fromAutoCloseable(IO(new OutputStreamWriter(output))).use { writer => IO.blocking(Printer.noSpaces.unsafePrintToAppendable(result.asJson, writer)) } diff --git a/lambda/shared/src/main/scala/feral/lambda/IOLambda.scala b/lambda/shared/src/main/scala/feral/lambda/IOLambda.scala index 03ddf1e2..8adea5ed 100644 --- a/lambda/shared/src/main/scala/feral/lambda/IOLambda.scala +++ b/lambda/shared/src/main/scala/feral/lambda/IOLambda.scala @@ -28,10 +28,10 @@ abstract class IOLambda[Event, Result]( implicit private[lambda] val decoder: Decoder[Event], private[lambda] val encoder: Encoder[Result] ) extends IOLambdaPlatform[Event, Result] - with IOSetup { + with IOLambdaSetup[Event, Result] + with IOSetup[Unit] { - final type Setup = (Event, Context[IO]) => IO[Option[Result]] - final override protected def setup: Resource[IO, Setup] = for { + final override protected def setup(ctx: Unit): Resource[IO, Setup] = for { handler <- handler localEvent <- IOLocal[Event](null.asInstanceOf[Event]).toResource localContext <- IOLocal[Context[IO]](null).toResource @@ -39,6 +39,9 @@ abstract class IOLambda[Event, Result]( result = handler(env) } yield { localEvent.set(_) *> localContext.set(_) *> result } + final override def setupAndRun(ev: Event, ctx: Context[IO]): IO[Option[Result]] = + setupMemo(()).flatMap(_(ev, ctx)) + def handler: Resource[IO, LambdaEnv[IO, Event] => IO[Option[Result]]] } diff --git a/lambda/shared/src/main/scala/feral/lambda/IOLambdaSetup.scala b/lambda/shared/src/main/scala/feral/lambda/IOLambdaSetup.scala new file mode 100644 index 00000000..1e1b60a3 --- /dev/null +++ b/lambda/shared/src/main/scala/feral/lambda/IOLambdaSetup.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package feral.lambda + +import cats.effect.IO +import cats.effect.unsafe.IORuntime +import io.circe.{Decoder, Encoder} + +private[lambda] trait IOLambdaSetup[Event, Result] { + protected def runtime: IORuntime = IORuntime.global + + private[lambda] implicit val decoder: Decoder[Event] + private[lambda] implicit val encoder: Encoder[Result] + final type Setup = (Event, Context[IO]) => IO[Option[Result]] + + private[lambda] def setupAndRun(ev: Event, ctx: Context[IO]): IO[Option[Result]] +} diff --git a/lambda/shared/src/main/scala/feral/lambda/TracedIOLambda.scala b/lambda/shared/src/main/scala/feral/lambda/TracedIOLambda.scala new file mode 100644 index 00000000..cf33d552 --- /dev/null +++ b/lambda/shared/src/main/scala/feral/lambda/TracedIOLambda.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package feral +package lambda + +import cats.effect.kernel.Resource +import cats.effect.syntax.all._ +import cats.effect.{IO, IOLocal} +import io.circe.{Decoder, Encoder} +import natchez._ + +import java.net.URI + +abstract class TracedIOLambda[Event, Result](entryPoint: IO[EntryPoint[IO]])( + implicit private[lambda] val decoder: Decoder[Event], + private[lambda] val encoder: Encoder[Result], + private[lambda] val KS: KernelSource[Event] +) extends IOLambdaPlatform[Event, Result] + with IOLambdaSetup[Event, Result] + with IOSetup[(Event, Context[IO])] { + + final override protected def setup(ctx: (Event, Context[IO])): Resource[IO, Setup] = for { + localEvent <- IOLocal(ctx._1).toResource + localContext <- IOLocal(ctx._2).toResource + env = LambdaEnv.ioLambdaEnv(localEvent, localContext) + trace <- Trace.ioTrace(defaultSpan(env)).toResource + setupResult = IOSetup(handler.tupled)(env -> trace) + } yield { + localEvent.set(_) *> localContext.set(_) *> trace.span("")(setupResult.flatten) + } + + final override def setupAndRun(ev: Event, ctx: Context[IO]): IO[Option[Result]] = + setupMemo(ev -> ctx).flatMap(_(ev, ctx)) + + private[this] def defaultSpan(env: LambdaEnv[IO, Event]): Span[IO] = + new Span[IO] { + def put(fields: (String, TraceValue)*): IO[Unit] = IO.unit + + def kernel: IO[Kernel] = env.event.map(KS.extract) + + def span(name: String): Resource[IO, Span[IO]] = for { + ctx <- env.context.toResource + k <- kernel.toResource + ep <- entryPoint.toResource + span <- ep.continueOrElseRoot(ctx.functionName, k).evalTap { span => + span.put( + AwsTags.arn(ctx.invokedFunctionArn), + AwsTags.requestId(ctx.awsRequestId) + ) + } + } yield span + + def traceId: IO[Option[String]] = IO.none + + def spanId: IO[Option[String]] = IO.none + + def traceUri: IO[Option[URI]] = IO.none + } + + def handler: (LambdaEnv[IO, Event], Trace[IO]) => Resource[IO, IO[Option[Result]]] +} diff --git a/lambda/shared/src/test/scala/feral/lambda/IOLambdaSuite.scala b/lambda/shared/src/test/scala/feral/lambda/IOLambdaSuite.scala index f02cf068..a260613f 100644 --- a/lambda/shared/src/test/scala/feral/lambda/IOLambdaSuite.scala +++ b/lambda/shared/src/test/scala/feral/lambda/IOLambdaSuite.scala @@ -34,11 +34,10 @@ class IOLambdaSuite extends CatsEffectSuite { .as(_.event.map(Some(_)) <* invokeCounter.getAndUpdate(_ + 1)) } } - handler <- lambda.setupMemo _ <- ('0' to 'z') .map(_.toString) .toList - .traverse(x => handler(x, mockContext).assertEquals(Some(x))) + .traverse(x => lambda.setupAndRun(x, mockContext).assertEquals(Some(x))) _ <- allocationCounter.get.assertEquals(1) _ <- invokeCounter.get.assertEquals(75) } yield () diff --git a/lambda/shared/src/test/scala/feral/lambda/TracedIOLambdaSuite.scala b/lambda/shared/src/test/scala/feral/lambda/TracedIOLambdaSuite.scala new file mode 100644 index 00000000..24546b99 --- /dev/null +++ b/lambda/shared/src/test/scala/feral/lambda/TracedIOLambdaSuite.scala @@ -0,0 +1,125 @@ +/* + * Copyright 2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package feral.lambda + +import cats.effect.{IO, Resource} +import cats.effect.kernel.Ref +import cats.implicits._ +import munit.CatsEffectSuite +import natchez._ + +import java.net.URI + +class TracedIOLambdaSuite extends CatsEffectSuite { + case class MockCompleteSpan( + name: String, + fields: Map[String, TraceValue] = Map.empty, + children: Vector[MockCompleteSpan] = Vector.empty) + + case class MockSpan( + name: String, + fields: Ref[IO, Map[String, TraceValue]], + children: Ref[IO, Vector[MockCompleteSpan]]) + extends Span[IO] { + override def put(addFields: (String, TraceValue)*): IO[Unit] = fields.update(_ ++ addFields) + + override def kernel: IO[Kernel] = IO.pure(Kernel(Map.empty)) + + override def span(name: String): Resource[IO, Span[IO]] = + Resource.make(MockSpan.create(name))(MockSpan.finalize(_, children)) + + override def traceId: IO[Option[String]] = IO.none + + override def spanId: IO[Option[String]] = IO.none + + override def traceUri: IO[Option[URI]] = IO.none + + } + + object MockSpan { + def create(name: String): IO[MockSpan] = + (IO.ref(Map.empty[String, TraceValue]), IO.ref(Vector.empty[MockCompleteSpan])).mapN { + case (fields, children) => new MockSpan(name, fields, children) + } + + def finalize(span: MockSpan, appendTo: Ref[IO, Vector[MockCompleteSpan]]): IO[Unit] = for { + fields <- span.fields.get + children <- span.children.get + completeSpan = MockCompleteSpan(span.name, fields, children) + _ <- appendTo.update(_.appended(completeSpan)) + } yield () + } + + case class MockEntryPoint(children: Ref[IO, Vector[MockCompleteSpan]]) + extends EntryPoint[IO] { + + override def root(name: String): Resource[IO, Span[IO]] = + continueOrElseRoot(name, Kernel(Map.empty)) + + override def continue(name: String, kernel: Kernel): Resource[IO, Span[IO]] = + continueOrElseRoot(name, kernel) + + override def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span[IO]] = + Resource.make(MockSpan.create(name))(MockSpan.finalize(_, children)) + } + + trait MockResource { + def query(s: String): IO[Option[String]] + } + object MockResource { + def apply(trace: Trace[IO]): Resource[IO, MockResource] = + Resource.eval(trace.span("init")(IO(new MockResource { + override def query(s: String): IO[Option[String]] = trace.span("query")(IO.some(s)) + }))) + } + + test("Trace produces correct spans") { + implicit val ks: KernelSource[String] = KernelSource.emptyKernelSource + for { + spansRecorder <- IO.ref(Vector.empty[MockCompleteSpan]) + entryPoint = MockEntryPoint(spansRecorder) + tracedLambda <- IO(new TracedIOLambda[String, String](IO.pure(entryPoint)) { + def handler: (LambdaEnv[IO, String], Trace[IO]) => Resource[IO, IO[Option[String]]] = { + (env, trace) => MockResource(trace).map(r => env.event.flatMap(r.query)) + } + }) + _ <- ('0' to '2') + .map(_.toString) + .toList + .traverse(e => tracedLambda.setupAndRun(e, mockContext(e)).assertEquals(Some(e))) + _ <- spansRecorder + .get + .assertEquals( + Vector( + expectRootSpan("0", Vector(MockCompleteSpan("init"), MockCompleteSpan("query"))), + expectRootSpan("1", Vector(MockCompleteSpan("query"))), + expectRootSpan("2", Vector(MockCompleteSpan("query"))) + )) + } yield () + } + + def mockContext(reqId: String) = + new Context[IO]("funcName", "", "funcArn", 0, reqId, "", "", None, None, IO.never) + + def expectRootSpan( + requestId: String, + children: Vector[MockCompleteSpan] = Vector.empty): MockCompleteSpan = + MockCompleteSpan( + "funcName", + Map("aws.arn" -> "funcArn", "aws.requestId" -> requestId), + children) +} From 02f6a06a14c372e7747ebcd07eca7157991d4f55 Mon Sep 17 00:00:00 2001 From: Tadas Dailyda Date: Thu, 7 Jul 2022 17:19:38 +0300 Subject: [PATCH 2/2] suppress unused param warning --- core/src/main/scala/feral/IOSetup.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/feral/IOSetup.scala b/core/src/main/scala/feral/IOSetup.scala index df6c1b72..2822e3c0 100644 --- a/core/src/main/scala/feral/IOSetup.scala +++ b/core/src/main/scala/feral/IOSetup.scala @@ -20,11 +20,12 @@ import cats.data.OptionT import cats.effect.IO import cats.effect.kernel.{Deferred, Resource} import cats.syntax.all._ +import scala.annotation.nowarn private[feral] trait IOSetup[Context] { protected type Setup - protected def setup(ctx: Context): Resource[IO, Setup] = + protected def setup(@nowarn ctx: Context): Resource[IO, Setup] = Resource.pure(null.asInstanceOf[Setup]) private[this] final lazy val deferred = Deferred.unsafe[IO, Either[Throwable, Setup]]