diff --git a/src/main/scala/com/cognite/sdk/scala/common/dataTypes.scala b/src/main/scala/com/cognite/sdk/scala/common/dataTypes.scala index f6b0bf24e..8bc9719e5 100644 --- a/src/main/scala/com/cognite/sdk/scala/common/dataTypes.scala +++ b/src/main/scala/com/cognite/sdk/scala/common/dataTypes.scala @@ -5,7 +5,7 @@ package com.cognite.sdk.scala.common import java.time.Instant import cats.Id -import com.cognite.sdk.scala.v1.CogniteId +import com.cognite.sdk.scala.v1.{CogniteExternalId, CogniteId, CogniteInternalId} import io.circe.{Decoder, Encoder, Json, JsonObject} import io.circe.generic.semiauto.deriveDecoder import sttp.model.Uri @@ -96,7 +96,28 @@ final case class CdpApiException( ).flatMap(_.toList).mkString s"Request ${maybeId}to ${url.toString} failed with status ${code.toString}: $message.$details" - }) + }) { + def missingIds: Seq[CogniteId] = + missing.getOrElse(Seq.empty).flatMap(CdpApiException.decodeId) + def missingExternalIds: Seq[String] = + missingIds.collect { case CogniteExternalId(externalId) => + externalId + } + def missingInternalIds: Seq[Long] = + missingIds.collect { case CogniteInternalId(id) => + id + } + def duplicatedIds: Seq[CogniteId] = + duplicated.getOrElse(Seq.empty).flatMap(CdpApiException.decodeId) + def duplicatedExternalIds: Seq[String] = + duplicatedIds.collect { case CogniteExternalId(externalId) => + externalId + } + def duplicatedInternalIds: Seq[Long] = + duplicatedIds.collect { case CogniteInternalId(id) => + id + } +} object CdpApiException { private def describeErrorList(kind: String)(items: Seq[JsonObject]): String = @@ -120,6 +141,20 @@ object CdpApiException { s" $kind ${key}s: [$commaSeparatedValues]." } .mkString + + private val externalIdDecoder: Decoder[CogniteExternalId] = deriveDecoder + private val internalIdDecoder: Decoder[CogniteInternalId] = deriveDecoder + private def decodeId(jsonObject: JsonObject): Option[CogniteId] = { + val json = Json.fromJsonObject(jsonObject) + internalIdDecoder.decodeJson(json) match { + case Right(id: CogniteId) => Some(id) + case _ => + externalIdDecoder.decodeJson(json) match { + case Right(id) => Some(id) + case _ => None + } + } + } } final case class DataPoint( diff --git a/src/main/scala/com/cognite/sdk/scala/v1/Client.scala b/src/main/scala/com/cognite/sdk/scala/v1/Client.scala index 49d05f55d..893789f6f 100644 --- a/src/main/scala/com/cognite/sdk/scala/v1/Client.scala +++ b/src/main/scala/com/cognite/sdk/scala/v1/Client.scala @@ -168,6 +168,7 @@ class GenericClient[F[_]]( lazy val dataSets = new DataSets[F](requestSession) lazy val labels = new Labels[F](requestSession) lazy val relationships = new Relationships[F](requestSession) + lazy val transformations = new Transformations[F](requestSession) lazy val rawDatabases = new RawDatabases[F](requestSession) def rawTables(database: String): RawTables[F] = new RawTables(requestSession, database) diff --git a/src/main/scala/com/cognite/sdk/scala/v1/resources/transformations.scala b/src/main/scala/com/cognite/sdk/scala/v1/resources/transformations.scala new file mode 100644 index 000000000..adaac29e6 --- /dev/null +++ b/src/main/scala/com/cognite/sdk/scala/v1/resources/transformations.scala @@ -0,0 +1,252 @@ +// Copyright 2020 Cognite AS +// SPDX-License-Identifier: Apache-2.0 + +package com.cognite.sdk.scala.v1.resources + +import cats.Monad +import cats.syntax.all._ +import com.cognite.sdk.scala.common._ +import com.cognite.sdk.scala.v1._ +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} +import sttp.client3._ +import sttp.client3.circe._ + +class Transformations[F[_]: Monad](val requestSession: RequestSession[F]) + extends WithRequestSession[F] + with Create[TransformationRead, TransformationCreate, F] + with RetrieveByIdsWithIgnoreUnknownIds[TransformationRead, F] + with Readable[TransformationRead, F] + with RetrieveByExternalIdsWithIgnoreUnknownIds[TransformationRead, F] + with DeleteByIdsWithIgnoreUnknownIds[F, Long] + with DeleteByExternalIdsWithIgnoreUnknownIds[F] + with UpdateById[TransformationRead, TransformationUpdate, F] + with UpdateByExternalId[TransformationRead, TransformationUpdate, F] { + import Transformations._ + override val baseUrl = uri"${requestSession.baseUrl}/transformations" + + val schedules: TransformationSchedules[F] = new TransformationSchedules[F](requestSession) + + override private[sdk] def readWithCursor( + cursor: Option[String], + limit: Option[Int], + partition: Option[Partition] + ): F[ItemsWithCursor[TransformationRead]] = + Readable.readWithCursor( + requestSession, + baseUrl, + cursor, + limit, + partition, + 1000 + ) + + override def retrieveByIds( + ids: Seq[Long], + ignoreUnknownIds: Boolean + ): F[Seq[TransformationRead]] = + RetrieveByIdsWithIgnoreUnknownIds.retrieveByIds( + requestSession, + baseUrl, + ids, + ignoreUnknownIds + ) + + override def retrieveByExternalIds( + externalIds: Seq[String], + ignoreUnknownIds: Boolean + ): F[Seq[TransformationRead]] = + RetrieveByExternalIdsWithIgnoreUnknownIds.retrieveByExternalIds( + requestSession, + baseUrl, + externalIds, + ignoreUnknownIds + ) + + override def createItems(items: Items[TransformationCreate]): F[Seq[TransformationRead]] = + Create.createItems[F, TransformationRead, TransformationCreate]( + requestSession, + baseUrl, + items + ) + + override def updateById( + items: Map[Long, TransformationUpdate] + ): F[Seq[TransformationRead]] = + UpdateById.updateById[F, TransformationRead, TransformationUpdate]( + requestSession, + baseUrl, + items + ) + + override def updateByExternalId( + items: Map[String, TransformationUpdate] + ): F[Seq[TransformationRead]] = + UpdateByExternalId.updateByExternalId[F, TransformationRead, TransformationUpdate]( + requestSession, + baseUrl, + items + ) + + override def deleteByIds(ids: Seq[Long]): F[Unit] = deleteByIds(ids, false) + + override def deleteByIds(ids: Seq[Long], ignoreUnknownIds: Boolean): F[Unit] = + DeleteByIds.deleteByIdsWithIgnoreUnknownIds(requestSession, baseUrl, ids, ignoreUnknownIds) + + override def deleteByExternalIds(externalIds: Seq[String]): F[Unit] = + deleteByExternalIds(externalIds, false) + + override def deleteByExternalIds(externalIds: Seq[String], ignoreUnknownIds: Boolean): F[Unit] = + DeleteByExternalIds.deleteByExternalIdsWithIgnoreUnknownIds( + requestSession, + baseUrl, + externalIds, + ignoreUnknownIds + ) + + def query[I]( + query: String, + sourceLimit: Option[Int], + limit: Int = 1000 + )(implicit itemDecoder: Decoder[I]): F[QueryResponse[I]] = { + implicit val responseItemsDecoder: Decoder[Items[I]] = deriveDecoder[Items[I]] + val _ = itemDecoder.hashCode + responseItemsDecoder.hashCode // suppress no usage warnings... 🤦 + implicit val responseDecoder: Decoder[QueryResponse[I]] = deriveDecoder[QueryResponse[I]] + requestSession.post[QueryResponse[I], QueryResponse[I], QueryQuery]( + QueryQuery(query), + uri"$baseUrl/query/run" + .addParam("limit", limit.toString) + .addParam("sourceLimit", sourceLimit.map(_.toString).getOrElse("all")), + identity + ) + } + + @SuppressWarnings(Array("org.wartremover.warts.TraversableOps")) + def queryOne[I]( + q: String, + sourceLimit: Option[Int] = None + )(implicit itemDecoder: Decoder[I]): F[I] = + query[I](q, sourceLimit, limit = 1).map(_.results.items.head) +} + +object Transformations { + implicit val scheduleReadDecoder: Decoder[TransformationScheduleRead] = deriveDecoder[TransformationScheduleRead] + implicit val readDecoder: Decoder[TransformationRead] = deriveDecoder[TransformationRead] + implicit val readItemsWithCursorDecoder: Decoder[ItemsWithCursor[TransformationRead]] = + deriveDecoder[ItemsWithCursor[TransformationRead]] + implicit val readItemsDecoder: Decoder[Items[TransformationRead]] = + deriveDecoder[Items[TransformationRead]] + implicit val createEncoder: Encoder[TransformationCreate] = deriveEncoder[TransformationCreate] + implicit val createItemsEncoder: Encoder[Items[TransformationCreate]] = + deriveEncoder[Items[TransformationCreate]] + implicit val updateEncoder: Encoder[TransformationUpdate] = + deriveEncoder[TransformationUpdate] + + implicit val errorOrUnitDecoder: Decoder[Either[CdpApiError, Unit]] = + EitherDecoder.eitherDecoder[CdpApiError, Unit] + implicit val deleteRequestWithRecursiveAndIgnoreUnknownIdsEncoder + : Encoder[ItemsWithRecursiveAndIgnoreUnknownIds] = + deriveEncoder[ItemsWithRecursiveAndIgnoreUnknownIds] + implicit val cogniteExternalIdDecoder: Decoder[CogniteExternalId] = + deriveDecoder[CogniteExternalId] + implicit val queryEncoder: Encoder[QueryQuery] = + deriveEncoder[QueryQuery] + implicit val querySchemaDecoder: Decoder[QuerySchemaColumn] = deriveDecoder[QuerySchemaColumn] + implicit val querySchemaItemsDecoder: Decoder[Items[QuerySchemaColumn]] = + deriveDecoder[Items[QuerySchemaColumn]] +} + +class TransformationSchedules[F[_]](val requestSession: RequestSession[F]) + extends WithRequestSession[F] + with Create[TransformationScheduleRead, TransformationScheduleCreate, F] + with RetrieveByIdsWithIgnoreUnknownIds[TransformationScheduleRead, F] + with Readable[TransformationScheduleRead, F] + with RetrieveByExternalIdsWithIgnoreUnknownIds[TransformationScheduleRead, F] + with DeleteByIdsWithIgnoreUnknownIds[F, Long] + with DeleteByExternalIdsWithIgnoreUnknownIds[F] + //with UpdateById[TransformConfigRead, StandardTransformConfigUpdate, F] + //with UpdateByExternalId[TransformConfigRead, StandardTransformConfigUpdate, F] + { + import TransformationSchedules._ + override val baseUrl = uri"${requestSession.baseUrl}/transformations/schedules" + + override private[sdk] def readWithCursor( + cursor: Option[String], + limit: Option[Int], + partition: Option[Partition] + ): F[ItemsWithCursor[TransformationScheduleRead]] = + Readable.readWithCursor( + requestSession, + baseUrl, + cursor, + limit, + partition, + 100 + ) + + override def retrieveByIds( + ids: Seq[Long], + ignoreUnknownIds: Boolean + ): F[Seq[TransformationScheduleRead]] = + RetrieveByIdsWithIgnoreUnknownIds.retrieveByIds( + requestSession, + baseUrl, + ids, + ignoreUnknownIds + ) + + override def retrieveByExternalIds( + externalIds: Seq[String], + ignoreUnknownIds: Boolean + ): F[Seq[TransformationScheduleRead]] = + RetrieveByExternalIdsWithIgnoreUnknownIds.retrieveByExternalIds( + requestSession, + baseUrl, + externalIds, + ignoreUnknownIds + ) + + override def createItems( + items: Items[TransformationScheduleCreate] + ): F[Seq[TransformationScheduleRead]] = + Create.createItems[F, TransformationScheduleRead, TransformationScheduleCreate]( + requestSession, + baseUrl, + items + ) + + override def deleteByIds(ids: Seq[Long]): F[Unit] = deleteByIds(ids, false) + + override def deleteByIds(ids: Seq[Long], ignoreUnknownIds: Boolean): F[Unit] = + DeleteByIds.deleteByIdsWithIgnoreUnknownIds(requestSession, baseUrl, ids, ignoreUnknownIds) + + override def deleteByExternalIds(externalIds: Seq[String]): F[Unit] = + deleteByExternalIds(externalIds, false) + + override def deleteByExternalIds(externalIds: Seq[String], ignoreUnknownIds: Boolean): F[Unit] = + DeleteByExternalIds.deleteByExternalIdsWithIgnoreUnknownIds( + requestSession, + baseUrl, + externalIds, + ignoreUnknownIds + ) +} + +object TransformationSchedules { + implicit val readDecoder: Decoder[TransformationScheduleRead] = + deriveDecoder[TransformationScheduleRead] + implicit val readItemsWithCursorDecoder: Decoder[ItemsWithCursor[TransformationScheduleRead]] = + deriveDecoder[ItemsWithCursor[TransformationScheduleRead]] + implicit val readItemsDecoder: Decoder[Items[TransformationScheduleRead]] = + deriveDecoder[Items[TransformationScheduleRead]] + implicit val createEncoder: Encoder[TransformationScheduleCreate] = + deriveEncoder[TransformationScheduleCreate] + implicit val createItemsEncoder: Encoder[Items[TransformationScheduleCreate]] = + deriveEncoder[Items[TransformationScheduleCreate]] + //implicit val updateEncoder: Encoder[StandardTransformConfigUpdate] = deriveEncoder[StandardTransformConfigUpdate] + + implicit val errorOrUnitDecoder: Decoder[Either[CdpApiError, Unit]] = + EitherDecoder.eitherDecoder[CdpApiError, Unit] + implicit val cogniteExternalIdDecoder: Decoder[CogniteExternalId] = + deriveDecoder[CogniteExternalId] +} diff --git a/src/main/scala/com/cognite/sdk/scala/v1/transformations.scala b/src/main/scala/com/cognite/sdk/scala/v1/transformations.scala new file mode 100644 index 000000000..828abf986 --- /dev/null +++ b/src/main/scala/com/cognite/sdk/scala/v1/transformations.scala @@ -0,0 +1,157 @@ +// Copyright 2020 Cognite AS +// SPDX-License-Identifier: Apache-2.0 + +package com.cognite.sdk.scala.v1 + +import java.time.Instant +import com.cognite.sdk.scala.common._ +import io.circe.{Decoder, Encoder, Json} +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} + +final case class TransformationRead( + id: Long, + name: String, + query: String, + destination: Json, + conflictMode: String, + isPublic: Boolean = false, + blocked: Option[TransformBlacklistInfo] = None, + createdTime: Instant = Instant.ofEpochMilli(0), + //updated: Instant = Instant.ofEpochMilli(0), + //owner: TransformConfigOwner, + ownerIsCurrentUser: Boolean = true, + hasSourceApiKey: Boolean = false, + hasDestinationApiKey: Boolean = false, + hasSourceOidcCredentials: Boolean = false, + hasDestinationOidcCredentials: Boolean = false, + //lastFinishedJob: Option[JobDetails], + //runningJob: Option[JobDetails], + schedule: Option[TransformationScheduleRead] = None, + externalId: Option[String] = None +) extends WithId[Long] + with WithExternalId + with WithCreatedTime + with ToCreate[TransformationCreate] + with ToUpdate[TransformationUpdate] { + + override def toCreate: TransformationCreate = + TransformationCreate( + name = name, + query = Some(query), + destination = Some(destination), + conflictMode = Some(conflictMode), + isPublic = Some(isPublic), + sourceApiKey = None, + destinationApiKey = None, + sourceOidcCredentials = None, + destinationOidcCredentials = None, + externalId = externalId + ) + + override def toUpdate: TransformationUpdate = + TransformationUpdate( + name = Some(SetValue(name)), + destination = Some(SetValue(destination)), + conflictMode = Some(SetValue(conflictMode)), + query = Some(SetValue(query)), + //sourceOidcCredentials = if (hasSourceOidcCredentials) { None } else { Some(SetNull()) }, + //destinationOidcCredentials = if (hasDestinationOidcCredentials) { None } else { Some(SetNull()) }, + sourceApiKey = if (hasSourceApiKey) { None } + else { Some(SetNull()) }, + destinationApiKey = if (hasDestinationApiKey) { None } + else { Some(SetNull()) }, + isPublic = Some(SetValue(isPublic)) + ) +} + +final case class TransformBlacklistInfo( + reason: String, + createdTime: Instant +) +object TransformBlacklistInfo { + implicit val encoder: Encoder[TransformBlacklistInfo] = deriveEncoder[TransformBlacklistInfo] + implicit val decoder: Decoder[TransformBlacklistInfo] = deriveDecoder[TransformBlacklistInfo] +} + +final case class FlatOidcCredentials( + clientId: String, + clientSecret: String, + scopes: String, + tokenUri: String, + cdfProjectName: String +) +object FlatOidcCredentials { + implicit val encoder: Encoder[FlatOidcCredentials] = deriveEncoder[FlatOidcCredentials] + implicit val decoder: Decoder[FlatOidcCredentials] = deriveDecoder[FlatOidcCredentials] +} + +final case class TransformationCreate( + name: String, + query: Option[String] = None, + destination: Option[Json] = None, + conflictMode: Option[String] = None, + isPublic: Option[Boolean] = None, + sourceApiKey: Option[String] = None, + destinationApiKey: Option[String] = None, + sourceOidcCredentials: Option[FlatOidcCredentials] = None, + destinationOidcCredentials: Option[FlatOidcCredentials] = None, + externalId: Option[String] = None +) extends WithExternalId + +final case class TransformationUpdate( + name: Option[NonNullableSetter[String]] = None, + destination: Option[NonNullableSetter[Json]] = None, + conflictMode: Option[NonNullableSetter[String]] = None, + query: Option[NonNullableSetter[String]] = None, + sourceOidcCredentials: Option[Setter[FlatOidcCredentialsUpdate]] = None, + destinationOidcCredentials: Option[Setter[FlatOidcCredentialsUpdate]] = None, + sourceApiKey: Option[Setter[String]] = None, + destinationApiKey: Option[Setter[String]] = None, + isPublic: Option[NonNullableSetter[Boolean]] = None +) + +final case class FlatOidcCredentialsUpdate( + clientId: Option[String] = None, + clientSecret: Option[String] = None, + scopes: Option[String] = None, + tokenUri: Option[String] = None, + cdfProjectName: Option[String] = None +) + +object FlatOidcCredentialsUpdate { + implicit val encoder: Encoder[FlatOidcCredentialsUpdate] = + deriveEncoder[FlatOidcCredentialsUpdate] +} + +final case class QueryQuery(query: String) + +final case class QueryResponse[T]( + results: Items[T], + schema: Items[QuerySchemaColumn] +) +final case class QuerySchemaColumn(name: String, sqlType: String, nullable: Boolean) + +final case class TransformationScheduleCreate( + interval: String, + isPaused: Option[Boolean] = None, + id: Option[Long] = None, + externalId: Option[String] = None +) + +final case class TransformationScheduleRead( + id: Long, + externalId: Option[String], + createdTime: Instant, + interval: String, + isPaused: Boolean +) extends WithExternalId + with WithId[Long] + with WithCreatedTime + with ToCreate[TransformationScheduleCreate] { + override def toCreate: TransformationScheduleCreate = + TransformationScheduleCreate( + interval = interval, + isPaused = Some(isPaused), + id = Some(id) + ) +} diff --git a/src/test/scala/com/cognite/sdk/scala/common/ReadBehaviours.scala b/src/test/scala/com/cognite/sdk/scala/common/ReadBehaviours.scala index e1910a25b..477652de8 100644 --- a/src/test/scala/com/cognite/sdk/scala/common/ReadBehaviours.scala +++ b/src/test/scala/com/cognite/sdk/scala/common/ReadBehaviours.scala @@ -29,6 +29,17 @@ trait ReadBehaviours extends Matchers with OptionValues { this: AnyFlatSpec => readable.read(limit = Some(1)).items should have length Math.min(listLength.toLong, 1) readable.read(limit = Some(2)).items should have length Math.min(listLength.toLong, 2) } + + it should "be able to page items by one" in { + val itemsPagedByOne = + Readable.pullFromCursor(None, None, None, (c, _, p) => readable.readWithCursor(c, Some(1), p)) + .stream + .take(5) + .compile.toList + itemsPagedByOne.length should be(Math.min(5, listLength)) + // all are distinct + itemsPagedByOne.distinct.length shouldBe itemsPagedByOne.length + } } it should "read all items" in { @@ -180,11 +191,11 @@ trait ReadBehaviours extends Matchers with OptionValues { this: AnyFlatSpec => private def fetchTestItems[R <: WithExternalId with WithCreatedTime](readable: Readable[R, Id]): List[R] = { // only use rows older than 10 minutes, to exclude items created by concurrently running tests which might be deleted quickly - val minAge = Instant.now().minus(10, ChronoUnit.MINUTES) + //val minAge = Instant.now().minus(10, ChronoUnit.MINUTES) readable .list() .filter(_.externalId.isDefined) - .filter(_.createdTime.isBefore(minAge)) + //.filter(_.createdTime.isBefore(minAge)) .take(2) .compile .toList @@ -244,13 +255,13 @@ trait ReadBehaviours extends Matchers with OptionValues { this: AnyFlatSpec => def readableWithRetrieveUnknownIds[R <: WithExternalId with WithId[Long] with WithCreatedTime, W]( readable: Readable[R, Id] with RetrieveByExternalIdsWithIgnoreUnknownIds[R, Id] - with RetrieveByIdsWithIgnoreUnknownIds[R, Id] + with RetrieveByIdsWithIgnoreUnknownIds[R, Id], + nonExistentId: Long = ThreadLocalRandom.current().nextLong(1, 9007199254740991L), + nonExistentExternalId: String = s"does-not-exist/${UUID.randomUUID.toString}" ): Unit = { val firstTwoItemItems = fetchTestItems(readable) val firstTwoExternalIds = firstTwoItemItems.map(_.externalId.value) val firstTwoIds = firstTwoItemItems.map(_.id) - val nonExistentExternalId = s"does-not-exist/${UUID.randomUUID.toString}" - val nonExistentId = ThreadLocalRandom.current().nextLong(1, 9007199254740991L) it should "support retrieving items by external id with ignoreUnknownIds=true" in { firstTwoExternalIds should have size 2 @@ -276,7 +287,8 @@ trait ReadBehaviours extends Matchers with OptionValues { this: AnyFlatSpec => ignoreUnknownIds = false ) } - exception.message should include("ids not found") + exception.message should include("not found") + exception.missingExternalIds shouldBe Seq(nonExistentExternalId) } it should "support retrieving items by id with ignoreUnknownIds=true" in { @@ -291,7 +303,8 @@ trait ReadBehaviours extends Matchers with OptionValues { this: AnyFlatSpec => val exception = intercept[CdpApiException] { readable.retrieveByIds(firstTwoIds ++ Seq(nonExistentId), ignoreUnknownIds = false) } - exception.message should include("ids not found") + exception.message should include("not found") + exception.missingInternalIds shouldBe Seq(nonExistentId) } } } diff --git a/src/test/scala/com/cognite/sdk/scala/common/SdkTestSpec.scala b/src/test/scala/com/cognite/sdk/scala/common/SdkTestSpec.scala index 1fa9b2359..dfa1ec259 100644 --- a/src/test/scala/com/cognite/sdk/scala/common/SdkTestSpec.scala +++ b/src/test/scala/com/cognite/sdk/scala/common/SdkTestSpec.scala @@ -48,7 +48,7 @@ abstract class SdkTestSpec extends AnyFlatSpec with Matchers { "scala-sdk-test", auth)(implicitly, sttpBackend) lazy val greenfieldClient: GenericClient[Id] = GenericClient.forAuth[Id]( - "scala-sdk-test", greenfieldAuth, "https://greenfield.cognitedata.com")(implicitly, sttpBackend) + "scala-sdk-test", greenfieldAuth, "http://localhost:4001", apiVersion = Some("playground"))(implicitly, sttpBackend) lazy val projectName: String = client.login.status().project diff --git a/src/test/scala/com/cognite/sdk/scala/common/WritableBehaviors.scala b/src/test/scala/com/cognite/sdk/scala/common/WritableBehaviors.scala index 977a884e1..259943714 100644 --- a/src/test/scala/com/cognite/sdk/scala/common/WritableBehaviors.scala +++ b/src/test/scala/com/cognite/sdk/scala/common/WritableBehaviors.scala @@ -279,6 +279,22 @@ trait WritableBehaviors extends Matchers with OptionValues { this: AnyFlatSpec = maybeDeletable.map(_.deleteByIds(updatedItems.map(_.id))) } + @SuppressWarnings(Array("org.wartremover.warts.OptionPartial")) + def updatableByBothIds[R <: ToCreate[W] with ToUpdate[U] with WithId[Long] with WithExternalId, W, U]( + resource: Create[R, W, Id] with UpdateById[R, U, Id] with UpdateByExternalId[R, U, Id] with RetrieveByIds[R, Id], + maybeDeletable: Option[DeleteByIds[Id, Long] with DeleteByExternalIds[Id]], + itemsToCreate: Seq[R], + updatesToMake: Seq[U], + expectedBehaviors: (Seq[R], Seq[R]) => Unit + ): Unit = { + val externalIdUpdates = updatesToMake.zip(itemsToCreate).map { + case (u, c) => c.externalId.get -> u + }.toMap + updatableById(resource, maybeDeletable, itemsToCreate, updatesToMake, expectedBehaviors) + updatableByExternalId(resource, maybeDeletable, itemsToCreate, externalIdUpdates, expectedBehaviors) + } + + def deletableWithIgnoreUnknownIds[R <: ToCreate[W] with WithId[Long], W, PrimitiveId]( writable: Create[R, W, Id] with DeleteByIdsWithIgnoreUnknownIds[Id, Long] diff --git a/src/test/scala/com/cognite/sdk/scala/v1/TransformationsTest.scala b/src/test/scala/com/cognite/sdk/scala/v1/TransformationsTest.scala new file mode 100644 index 000000000..1ad81f1d9 --- /dev/null +++ b/src/test/scala/com/cognite/sdk/scala/v1/TransformationsTest.scala @@ -0,0 +1,282 @@ +// Copyright 2020 Cognite AS +// SPDX-License-Identifier: Apache-2.0 + +package com.cognite.sdk.scala.v1 + +import com.cognite.sdk.scala.common._ +import io.circe.Json +import io.circe.generic.auto._ + +import java.time.Instant + +@SuppressWarnings(Array("org.wartremover.warts.TraversableOps", "org.wartremover.warts.NonUnitStatements", "org.wartremover.warts.Null")) +class TransformationsTest extends SdkTestSpec with ReadBehaviours with WritableBehaviors with RetryWhile { + private val idsThatDoNotExist = Seq(999991L, 999992L) + private val externalIdsThatDoNotExist = Seq("5PNii0w4GCDBvXPZ", "6VhKQqtTJqBHGulw") + + it should behave like readable(greenfieldClient.transformations) + + it should behave like readableWithRetrieve(greenfieldClient.transformations, idsThatDoNotExist, supportsMissingAndThrown = true) + + it should behave like readableWithRetrieveByExternalId(greenfieldClient.transformations, externalIdsThatDoNotExist, supportsMissingAndThrown = true) + + it should behave like readableWithRetrieveUnknownIds(greenfieldClient.transformations, nonExistentId = 999992) + + it should behave like writable( + greenfieldClient.transformations, + Some(greenfieldClient.transformations), + Seq(TransformationRead( + name = "scala-sdk-read-example-1", + id = 0, + query = "select 1", + destination = Json.obj("type" -> Json.fromString("events")), + conflictMode = "upsert")), + + Seq(TransformationCreate(name = "scala-sdk-create-example-1")), + idsThatDoNotExist, + supportsMissingAndThrown = true + ) + + it should behave like writableWithExternalId( + greenfieldClient.transformations, + Some(greenfieldClient.transformations), + Seq(TransformationRead( + name = "scala-sdk-read-example-2", + id = 0, + query = "select 1", + destination = Json.obj("type" -> Json.fromString("events")), + conflictMode = "upsert", + externalId = Some(shortRandom()))), + + Seq(TransformationCreate(name = "scala-sdk-create-example-2", externalId = Some(shortRandom()))), + externalIdsThatDoNotExist, + supportsMissingAndThrown = true + ) + + it should behave like deletableWithIgnoreUnknownIds( + greenfieldClient.transformations, + Seq( + TransformationRead( + name = "scala-sdk-read-example-2", + id = 0, + query = "select 1", + destination = Json.obj("type" -> Json.fromString("events")), + conflictMode = "upsert", + externalId = Some(shortRandom())) + ), + idsThatDoNotExist + ) + + private val transformsToCreate = Seq( + TransformationRead( + name = "scala-sdk-read-example-2", + id = 0, + query = "select 1", + destination = Json.obj("type" -> Json.fromString("events")), + conflictMode = "upsert", + externalId = Some(shortRandom())), + TransformationRead( + name = "scala-sdk-read-example-2", + id = 0, + query = "select 1", + destination = Json.obj("type" -> Json.fromString("events")), + conflictMode = "upsert", + externalId = Some(shortRandom())) + ) + private val transformUpdates = Seq( + TransformationRead( + name = "scala-sdk-read-example-2-1", + id = 0, + query = "select 1", + destination = Json.obj("type" -> Json.fromString("events")), + conflictMode = "upsert", + externalId = Some(shortRandom())), // scalastyle:ignore null + TransformationRead( + name = "scala-sdk-read-example-2-1", + id = 0, + query = "select 1", + destination = Json.obj("type" -> Json.fromString("events")), + conflictMode = "upsert", + externalId = Some(shortRandom())) + ) + it should behave like updatable( + greenfieldClient.transformations, + Some(greenfieldClient.transformations), + transformsToCreate, + transformUpdates, + (id: Long, item: TransformationRead) => item.copy(id = id), + (a: TransformationRead, b: TransformationRead) => { + a === b + }, + (read: Seq[TransformationRead], updated: Seq[TransformationRead]) => { + transformsToCreate.size shouldBe updated.size + read.size shouldBe updated.size + updated.size shouldBe transformUpdates.size + updated.map(_.name) shouldBe read.map(read => s"${read.name}-1") + () + } + ) + + it should behave like updatableByBothIds( + greenfieldClient.transformations, + Some(greenfieldClient.transformations), + transformsToCreate, + Seq( + TransformationUpdate(name = Some(SetValue("scala-sdk-update-1-1"))), + TransformationUpdate( + destination = Some(SetValue(Json.obj("type" -> Json.fromString("datapoints")))), + query = Some(SetValue("select 2")), + sourceApiKey = Some(SetValue(greenfieldAuth.asInstanceOf[ApiKeyAuth].apiKey)), + isPublic = Some(SetValue(true)) + ) + ), + (read: Seq[TransformationRead], updated: Seq[TransformationRead]) => { + assert(read.size == updated.size) + assert(read.size == transformsToCreate.size) + assert(read.size == transformUpdates.size) + updated.map(_.name) shouldBe List("scala-sdk-update-1-1", "scala-sdk-read-example-2") + assert(updated(1).isPublic) + assert(!read(1).isPublic) + assert(updated(1).hasSourceApiKey) + updated(1).query shouldBe "select 2" + () + } + ) + + case class RawAggregationResponse(average: Double) + + it should "query average" in { + greenfieldClient.transformations.list().compile.toList + val response = greenfieldClient.transformations.queryOne[RawAggregationResponse]( + "select avg(` V1 vcross (m/s)`) as average from ORCA.VAN_net" + ) + assert(response.average > -1) + assert(response.average < 1) + } + + case class AssetIdentifier(id: Long, externalId: Option[String], name: String) + + it should "query assets" in { + val assetId = "scalasdk-transforms-" + shortRandom() + greenfieldClient.assets.createOne(AssetCreate( + name = assetId, + externalId = Some(assetId), + description = Some("Test asset for transformations") + )) + + val response = + retryWithExpectedResult[Seq[AssetIdentifier]]( + greenfieldClient.transformations.query[AssetIdentifier]( + s"""select externalId, id, name + from _cdf.assets + where name = "$assetId" + """, + sourceLimit = None + ).results.items, + response => assert(response.length == 1) + ) + response.head.externalId shouldBe Some(assetId) + response.head.name shouldBe assetId + } +} +@SuppressWarnings(Array("org.wartremover.warts.TraversableOps", "org.wartremover.warts.NonUnitStatements", "org.wartremover.warts.Null")) +class TransformationSchedulesTest extends SdkTestSpec with ReadBehaviours with WritableBehaviors with RetryWhile { + private val idsThatDoNotExist = Seq(999991L, 999992L) + private val externalIdsThatDoNotExist = Seq("5PNii0w4GCDBvXPZ", "6VhKQqtTJqBHGulw") + private val someTransformation = + greenfieldClient.transformations.createOne(TransformationCreate( + name = "scala-sdk-create-noschedule", + externalId = Some("transforms-test1-" + shortRandom()), + query = Some("select 1"), + destination = Some(Json.obj("type" -> Json.fromString("events"))), + destinationApiKey = Some(greenfieldAuth.asInstanceOf[ApiKeyAuth].apiKey), + sourceApiKey = Some(greenfieldAuth.asInstanceOf[ApiKeyAuth].apiKey) + )) + private val someTransformationWithSchedule = + greenfieldClient.transformations.createOne(TransformationCreate( + name = "scala-sdk-create-schedule", + externalId = Some("transforms-test2-" + shortRandom()), + query = Some("select 1"), + destination = Some(Json.obj("type" -> Json.fromString("events"))), + destinationApiKey = Some(greenfieldAuth.asInstanceOf[ApiKeyAuth].apiKey), + sourceApiKey = Some(greenfieldAuth.asInstanceOf[ApiKeyAuth].apiKey) + )) + private val anotherTransformationWithSchedule = + greenfieldClient.transformations.createOne(TransformationCreate( + name = "scala-sdk-create-schedule2", + externalId = Some("transforms-test3-" + shortRandom()), + query = Some("select 1"), + destination = Some(Json.obj("type" -> Json.fromString("events"))), + destinationApiKey = Some(greenfieldAuth.asInstanceOf[ApiKeyAuth].apiKey), + sourceApiKey = Some(greenfieldAuth.asInstanceOf[ApiKeyAuth].apiKey) + )) + private val theSchedule = greenfieldClient.transformations.schedules.createOne(TransformationScheduleCreate( + interval = "1 1 1 1 1", + id = Some(someTransformationWithSchedule.id), + isPaused = Some(true) + )) + private val anotherSchedule = greenfieldClient.transformations.schedules.createOne(TransformationScheduleCreate( + interval = "1 1 1 1 1", + externalId = anotherTransformationWithSchedule.externalId, + isPaused = Some(true) + )) + + it should behave like readable(greenfieldClient.transformations.schedules) + + it should behave like readableWithRetrieve(greenfieldClient.transformations.schedules, idsThatDoNotExist, supportsMissingAndThrown = true) + + it should behave like readableWithRetrieveByExternalId(greenfieldClient.transformations.schedules, externalIdsThatDoNotExist, supportsMissingAndThrown = true) + + it should behave like readableWithRetrieveUnknownIds(greenfieldClient.transformations.schedules, nonExistentId = 999992) + + it should behave like writable( + greenfieldClient.transformations.schedules, + Some(greenfieldClient.transformations.schedules), + Seq(TransformationScheduleRead( + id = someTransformation.id, + externalId = None, + createdTime = Instant.now(), + interval = "1 1 1 1 1", + isPaused = false)), + + Seq(TransformationScheduleCreate("1 1 1 1 1", id = Some(someTransformation.id))), + idsThatDoNotExist, + supportsMissingAndThrown = true + ) + + it should behave like writableWithExternalId( + greenfieldClient.transformations, + Some(greenfieldClient.transformations), + Seq(TransformationRead( + name = "scala-sdk-read-example-2", + id = 0, + query = "select 1", + destination = Json.obj("type" -> Json.fromString("events")), + conflictMode = "upsert", + externalId = Some(shortRandom()))), + + Seq(TransformationCreate(name = "scala-sdk-create-example-2", externalId = Some(shortRandom()))), + externalIdsThatDoNotExist, + supportsMissingAndThrown = true + ) + + it should behave like deletableWithIgnoreUnknownIds( + greenfieldClient.transformations, + Seq( + TransformationRead( + name = "scala-sdk-read-example-2", + id = 0, + query = "select 1", + destination = Json.obj("type" -> Json.fromString("events")), + conflictMode = "upsert", + externalId = Some(shortRandom())) + ), + idsThatDoNotExist + ) + + it should "delete schedule and transforms" in { + greenfieldClient.transformations.schedules.deleteById(theSchedule.id) + greenfieldClient.transformations.schedules.deleteByExternalId(anotherSchedule.externalId.get) + greenfieldClient.transformations.deleteByIds(Seq(someTransformation.id, someTransformationWithSchedule.id)) + } +}