Skip to content
This repository was archived by the owner on Dec 4, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,28 @@ version := "0.2.5"

organization := "io.findify"

scalaVersion := "2.12.4"
scalaVersion := "2.12.8"

crossScalaVersions := Seq("2.11.11", "2.12.4")

val akkaVersion = "2.5.11"
val akkaVersion = "2.5.21"

licenses := Seq("MIT" -> url("https://opensource.org/licenses/MIT"))

homepage := Some(url("https://github.com/findify/s3mock"))

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % "10.1.0",
"com.typesafe.akka" %% "akka-http" % "10.1.8",
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % "test",
"org.scala-lang.modules" %% "scala-xml" % "1.1.0",
"com.github.pathikrit" %% "better-files" % "3.4.0",
"com.typesafe.scala-logging" %% "scala-logging" % "3.8.0",
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.294",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.scala-lang.modules" %% "scala-xml" % "1.1.1",
"com.github.pathikrit" %% "better-files" % "3.7.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.524",
"org.scalatest" %% "scalatest" % "3.0.7" % "test",
"ch.qos.logback" % "logback-classic" % "1.2.3" % "test",
"org.iq80.leveldb" % "leveldb" % "0.10",
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.17" % "test"
"org.iq80.leveldb" % "leveldb" % "0.11",
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.20" % "test"
)

parallelExecution in Test := false
Expand Down Expand Up @@ -59,7 +59,7 @@ mainClass in assembly := Some("io.findify.s3mock.Main")
test in assembly := {}

dockerfile in docker := new Dockerfile {
from("openjdk:9.0.1-11-jre-slim")
from("openjdk:11-slim")
expose(8001)
add(assembly.value, "/app/s3mock.jar")
entryPoint("java", "-Xmx128m", "-jar", "--add-modules", "java.xml.bind", "/app/s3mock.jar")
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.1.0
sbt.version = 1.2.8
13 changes: 6 additions & 7 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
logLevel := Level.Warn

addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.3.3")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.0")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.7")
addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.5.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.4.0")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.5")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2-1")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.10")
addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.5.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
10 changes: 8 additions & 2 deletions src/main/scala/io/findify/s3mock/provider/FileProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ class FileProvider(dir:String) extends Provider with LazyLogging {
GetObjectData(file.byteArray, meta)
}

override def putObjectMultipartStart(bucket:String, key:String):InitiateMultipartUploadResult = {
override def putObjectMultipartStart(bucket:String, key:String, metadata: ObjectMetadata):InitiateMultipartUploadResult = {
val id = Math.abs(Random.nextLong()).toString
val bucketFile = File(s"$dir/$bucket")
if (!bucketFile.exists) throw NoSuchBucketException(bucket)
File(s"$dir/.mp/$bucket/$key/$id/.keep").createIfNotExists(createParents = true)
metadataStore.put(bucket, key, metadata)
logger.debug(s"starting multipart upload for s3://$bucket/$key")
InitiateMultipartUploadResult(bucket, key, id)
}
Expand All @@ -116,8 +117,13 @@ class FileProvider(dir:String) extends Provider with LazyLogging {
val data = parts.fold(Array[Byte]())(_ ++ _)
file.writeBytes(data.toIterator)
File(s"$dir/.mp/$bucket/$key").delete()
val hash = file.md5
metadataStore.get(bucket, key).foreach {m =>
m.setContentMD5(hash)
m.setLastModified(org.joda.time.DateTime.now().toDate)
}
logger.debug(s"completed multipart upload for s3://$bucket/$key")
CompleteMultipartUploadResult(bucket, key, file.md5)
CompleteMultipartUploadResult(bucket, key, hash)
}

override def copyObject(sourceBucket: String, sourceKey: String, destBucket: String, destKey: String, newMeta: Option[ObjectMetadata] = None): CopyObjectResult = {
Expand Down
10 changes: 8 additions & 2 deletions src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ class InMemoryProvider extends Provider with LazyLogging {
}
}

override def putObjectMultipartStart(bucket: String, key: String): InitiateMultipartUploadResult = {
override def putObjectMultipartStart(bucket: String, key: String, metadata: ObjectMetadata): InitiateMultipartUploadResult = {
bucketDataStore.get(bucket) match {
case Some(_) =>
val id = Math.abs(Random.nextLong()).toString
multipartTempStore.putIfAbsent(id, new mutable.TreeSet)
metadataStore.put(bucket, key, metadata)
logger.debug(s"starting multipart upload for s3://$bucket/$key")
InitiateMultipartUploadResult(bucket, key, id)
case None => throw NoSuchBucketException(bucket)
Expand All @@ -128,7 +129,12 @@ class InMemoryProvider extends Provider with LazyLogging {
bucketContent.keysInBucket.put(key, KeyContents(DateTime.now, completeBytes))
multipartTempStore.remove(uploadId)
logger.debug(s"completed multipart upload for s3://$bucket/$key")
CompleteMultipartUploadResult(bucket, key, DigestUtils.md5Hex(completeBytes))
val hash = DigestUtils.md5Hex(completeBytes)
metadataStore.get(bucket, key).foreach {m =>
m.setContentMD5(hash)
m.setLastModified(org.joda.time.DateTime.now().toDate)
}
CompleteMultipartUploadResult(bucket, key, hash)
case None => throw NoSuchBucketException(bucket)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/findify/s3mock/provider/Provider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ trait Provider {
def createBucket(name:String, bucketConfig:CreateBucketConfiguration):CreateBucket
def putObject(bucket:String, key:String, data:Array[Byte], metadata: ObjectMetadata):Unit
def getObject(bucket:String, key:String): GetObjectData
def putObjectMultipartStart(bucket:String, key:String):InitiateMultipartUploadResult
def putObjectMultipartStart(bucket:String, key:String, metadata: ObjectMetadata):InitiateMultipartUploadResult
def putObjectMultipartPart(bucket:String, key:String, partNumber:Int, uploadId:String, data:Array[Byte]):Unit
def putObjectMultipartComplete(bucket:String, key:String, uploadId:String, request:CompleteMultipartUpload):CompleteMultipartUploadResult
def deleteObject(bucket:String, key:String):Unit
Expand Down
75 changes: 75 additions & 0 deletions src/main/scala/io/findify/s3mock/route/MetadataUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.findify.s3mock.route

import java.lang.Iterable
import java.util

import akka.http.javadsl.model.HttpHeader
import akka.http.scaladsl.model.HttpRequest
import com.amazonaws.AmazonClientException
import com.amazonaws.services.s3.Headers
import com.amazonaws.services.s3.internal.ServiceUtils
import com.amazonaws.services.s3.model.ObjectMetadata
import com.amazonaws.util.{DateUtils, StringUtils}
import com.typesafe.scalalogging.LazyLogging

import scala.collection.JavaConverters._

object MetadataUtil extends LazyLogging {

def populateObjectMetadata(request: HttpRequest): ObjectMetadata = {
val metadata = new ObjectMetadata()
val ignoredHeaders: util.HashSet[String] = new util.HashSet[String]()
ignoredHeaders.add(Headers.DATE)
ignoredHeaders.add(Headers.SERVER)
ignoredHeaders.add(Headers.REQUEST_ID)
ignoredHeaders.add(Headers.EXTENDED_REQUEST_ID)
ignoredHeaders.add(Headers.CLOUD_FRONT_ID)
ignoredHeaders.add(Headers.CONNECTION)

val headers: Iterable[HttpHeader] = request.getHeaders()
for (header <- headers.asScala) {
var key: String = header.name()
if (StringUtils.beginsWithIgnoreCase(key, Headers.S3_USER_METADATA_PREFIX)) {
key = key.substring(Headers.S3_USER_METADATA_PREFIX.length)
metadata.addUserMetadata(key, header.value())
}
// else if (ignoredHeaders.contains(key)) {
// ignore...
// }
else if (key.equalsIgnoreCase(Headers.LAST_MODIFIED)) try
metadata.setHeader(key, ServiceUtils.parseRfc822Date(header.value()))

catch {
case pe: Exception => logger.warn("Unable to parse last modified date: " + header.value(), pe)
}
else if (key.equalsIgnoreCase(Headers.CONTENT_LENGTH)) try
metadata.setHeader(key, java.lang.Long.parseLong(header.value()))

catch {
case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse content length. Header 'Content-Length' has corrupted data" + nfe.getMessage, nfe)
}
else if (key.equalsIgnoreCase(Headers.ETAG)) metadata.setHeader(key, ServiceUtils.removeQuotes(header.value()))
else if (key.equalsIgnoreCase(Headers.EXPIRES)) try
metadata.setHttpExpiresDate(DateUtils.parseRFC822Date(header.value()))

catch {
case pe: Exception => logger.warn("Unable to parse http expiration date: " + header.value(), pe)
}
// else if (key.equalsIgnoreCase(Headers.EXPIRATION)) new ObjectExpirationHeaderHandler[ObjectMetadata]().handle(metadata, response)
// else if (key.equalsIgnoreCase(Headers.RESTORE)) new ObjectRestoreHeaderHandler[ObjectRestoreResult]().handle(metadata, response)
// else if (key.equalsIgnoreCase(Headers.REQUESTER_CHARGED_HEADER)) new S3RequesterChargedHeaderHandler[S3RequesterChargedResult]().handle(metadata, response)
else if (key.equalsIgnoreCase(Headers.S3_PARTS_COUNT)) try
metadata.setHeader(key, header.value().toInt)

catch {
case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse part count. Header x-amz-mp-parts-count has corrupted data" + nfe.getMessage, nfe)
}
else metadata.setHeader(key, header.value())
}

if(metadata.getContentType == null){
metadata.setContentType(request.entity.getContentType.toString)
}
metadata
}
}
64 changes: 1 addition & 63 deletions src/main/scala/io/findify/s3mock/route/PutObject.scala
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
package io.findify.s3mock.route

import java.lang.Iterable
import java.util

import akka.http.javadsl.model.HttpHeader
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import akka.util.ByteString
import com.amazonaws.AmazonClientException
import com.amazonaws.services.s3.Headers
import com.amazonaws.services.s3.internal.ServiceUtils
import com.amazonaws.services.s3.model.ObjectMetadata
import com.amazonaws.util.{DateUtils, StringUtils}
import com.typesafe.scalalogging.LazyLogging
import io.findify.s3mock.S3ChunkedProtocolStage
import io.findify.s3mock.error.{InternalErrorException, NoSuchBucketException}
import io.findify.s3mock.provider.Provider
import org.apache.commons.codec.digest.DigestUtils

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -95,60 +86,7 @@ case class PutObject(implicit provider:Provider, mat:Materializer) extends LazyL
}

private def populateObjectMetadata(request: HttpRequest, bytes: Array[Byte]): ObjectMetadata = {
val metadata = new ObjectMetadata()
val ignoredHeaders: util.HashSet[String] = new util.HashSet[String]()
ignoredHeaders.add(Headers.DATE)
ignoredHeaders.add(Headers.SERVER)
ignoredHeaders.add(Headers.REQUEST_ID)
ignoredHeaders.add(Headers.EXTENDED_REQUEST_ID)
ignoredHeaders.add(Headers.CLOUD_FRONT_ID)
ignoredHeaders.add(Headers.CONNECTION)

val headers: Iterable[HttpHeader] = request.getHeaders()
for (header <- headers.asScala) {
var key: String = header.name()
if (StringUtils.beginsWithIgnoreCase(key, Headers.S3_USER_METADATA_PREFIX)) {
key = key.substring(Headers.S3_USER_METADATA_PREFIX.length)
metadata.addUserMetadata(key, header.value())
}
// else if (ignoredHeaders.contains(key)) {
// ignore...
// }
else if (key.equalsIgnoreCase(Headers.LAST_MODIFIED)) try
metadata.setHeader(key, ServiceUtils.parseRfc822Date(header.value()))

catch {
case pe: Exception => logger.warn("Unable to parse last modified date: " + header.value(), pe)
}
else if (key.equalsIgnoreCase(Headers.CONTENT_LENGTH)) try
metadata.setHeader(key, java.lang.Long.parseLong(header.value()))

catch {
case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse content length. Header 'Content-Length' has corrupted data" + nfe.getMessage, nfe)
}
else if (key.equalsIgnoreCase(Headers.ETAG)) metadata.setHeader(key, ServiceUtils.removeQuotes(header.value()))
else if (key.equalsIgnoreCase(Headers.EXPIRES)) try
metadata.setHttpExpiresDate(DateUtils.parseRFC822Date(header.value()))

catch {
case pe: Exception => logger.warn("Unable to parse http expiration date: " + header.value(), pe)
}
// else if (key.equalsIgnoreCase(Headers.EXPIRATION)) new ObjectExpirationHeaderHandler[ObjectMetadata]().handle(metadata, response)
// else if (key.equalsIgnoreCase(Headers.RESTORE)) new ObjectRestoreHeaderHandler[ObjectRestoreResult]().handle(metadata, response)
// else if (key.equalsIgnoreCase(Headers.REQUESTER_CHARGED_HEADER)) new S3RequesterChargedHeaderHandler[S3RequesterChargedResult]().handle(metadata, response)
else if (key.equalsIgnoreCase(Headers.S3_PARTS_COUNT)) try
metadata.setHeader(key, header.value().toInt)

catch {
case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse part count. Header x-amz-mp-parts-count has corrupted data" + nfe.getMessage, nfe)
}
else metadata.setHeader(key, header.value())
}

if(metadata.getContentType == null){
metadata.setContentType(request.entity.getContentType.toString)
}
metadata.getRawMetadata
val metadata = MetadataUtil.populateObjectMetadata(request)
metadata.setContentMD5(DigestUtils.md5Hex(bytes))
metadata
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,30 @@ import scala.util.{Failure, Success, Try}
*/
case class PutObjectMultipartStart(implicit provider:Provider) extends LazyLogging {
def route(bucket:String, path:String) = post {
parameter('uploads) { mp =>
complete {
logger.info(s"multipart upload start to $bucket/$path")
Try(provider.putObjectMultipartStart(bucket, path)) match {
case Success(result) =>
HttpResponse(
StatusCodes.OK,
entity = HttpEntity(
ContentTypes.`application/octet-stream`, result.toXML.toString().getBytes(StandardCharsets.UTF_8)
extractRequest { request =>
parameter('uploads) { mp =>
complete {
val metadata = MetadataUtil.populateObjectMetadata(request)
logger.info(s"multipart upload start to $bucket/$path")
Try(provider.putObjectMultipartStart(bucket, path, metadata)) match {
case Success(result) =>
HttpResponse(
StatusCodes.OK,
entity = HttpEntity(
ContentTypes.`application/octet-stream`, result.toXML.toString().getBytes(StandardCharsets.UTF_8)
)
)
)
case Failure(e: NoSuchBucketException) =>
HttpResponse(
StatusCodes.NotFound,
entity = e.toXML.toString()
)
case Failure(t) =>
HttpResponse(
StatusCodes.InternalServerError,
entity = InternalErrorException(t).toXML.toString()
)
case Failure(e: NoSuchBucketException) =>
HttpResponse(
StatusCodes.NotFound,
entity = e.toXML.toString()
)
case Failure(t) =>
HttpResponse(
StatusCodes.InternalServerError,
entity = InternalErrorException(t).toXML.toString()
)
}
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/test/scala/io/findify/s3mock/MultipartUploadTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,23 @@ class MultipartUploadTest extends S3MockTest {
exc.getStatusCode shouldBe 404
exc.getErrorCode shouldBe "NoSuchBucket"
}

it should "upload multipart with metadata" in {
s3.createBucket("getput")
val metadata: ObjectMetadata = new ObjectMetadata()
metadata.setContentType("application/json")
metadata.addUserMetadata("metamaic", "maic")
val init = s3.initiateMultipartUpload(new InitiateMultipartUploadRequest("getput", "foo4", metadata))
val p1 = s3.uploadPart(new UploadPartRequest().withBucketName("getput").withPartSize(10).withKey("foo4").withPartNumber(1).withUploadId(init.getUploadId).withInputStream(new ByteArrayInputStream("hellohello".getBytes())))
val p2 = s3.uploadPart(new UploadPartRequest().withBucketName("getput").withPartSize(10).withKey("foo4").withPartNumber(2).withUploadId(init.getUploadId).withInputStream(new ByteArrayInputStream("worldworld".getBytes())))
val result = s3.completeMultipartUpload(new CompleteMultipartUploadRequest("getput", "foo4", init.getUploadId, List(p1.getPartETag, p2.getPartETag).asJava))
result.getKey shouldBe "foo4"
val s3Object = s3.getObject("getput", "foo4")
getContent(s3Object) shouldBe "hellohelloworldworld"

val actualMetadata: ObjectMetadata = s3Object.getObjectMetadata
actualMetadata.getContentType shouldBe "application/json"
actualMetadata.getUserMetadata.get("metamaic") shouldBe "maic"
}
}
}