From 0c5b63b1e153b30f0de97544b0650dc203296a88 Mon Sep 17 00:00:00 2001 From: svezfaz Date: Wed, 29 Mar 2017 22:48:26 +0100 Subject: [PATCH] S3 - add documentation #103 --- docs/src/main/paradox/connectors.md | 1 + docs/src/main/paradox/s3.md | 82 ++++++++++ s3/src/main/resources/reference.conf | 6 +- .../akka/stream/alpakka/s3/S3Exception.scala | 2 +- .../akka/stream/alpakka/s3/S3Settings.scala | 4 +- .../alpakka/s3/javadsl/S3ClientTest.java | 66 +++++++++ .../stream/alpakka/s3/impl/S3SinkSpec.scala | 93 ------------ .../stream/alpakka/s3/impl/S3SourceSpec.scala | 46 ------ .../stream/alpakka/s3/impl/WireMockBase.scala | 88 ----------- .../s3/scaladsl/S3ClientIntegrationSpec.scala | 27 ++++ .../s3/{impl => scaladsl}/S3NoMock.scala | 4 +- .../alpakka/s3/scaladsl/S3SinkSpec.scala | 41 +++++ .../alpakka/s3/scaladsl/S3SourceSpec.scala | 47 ++++++ .../alpakka/s3/scaladsl/S3WireMockBase.scala | 140 ++++++++++++++++++ 14 files changed, 411 insertions(+), 236 deletions(-) create mode 100644 docs/src/main/paradox/s3.md create mode 100644 s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java delete mode 100644 s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala delete mode 100644 s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala delete mode 100644 s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala create mode 100644 s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3ClientIntegrationSpec.scala rename s3/src/test/scala/akka/stream/alpakka/s3/{impl => scaladsl}/S3NoMock.scala (94%) create mode 100644 s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala create mode 100644 s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SourceSpec.scala create mode 100644 s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3WireMockBase.scala diff --git a/docs/src/main/paradox/connectors.md b/docs/src/main/paradox/connectors.md index 59f375cbe4..17eb4fc8e7 100644 --- a/docs/src/main/paradox/connectors.md +++ b/docs/src/main/paradox/connectors.md @@ -13,6 +13,7 @@ * [HBase Connectors](hbase.md) * [JMS Connectors](jms.md) * [MQTT Connector](mqtt.md) +* [S3 Connector](s3.md) * [Server-sent Events (SSE)](sse.md) @@@ diff --git a/docs/src/main/paradox/s3.md b/docs/src/main/paradox/s3.md new file mode 100644 index 0000000000..c86ba3e110 --- /dev/null +++ b/docs/src/main/paradox/s3.md @@ -0,0 +1,82 @@ +# S3 Connector + +The S3 connector provides Akka Stream sources and sinks to connect to [Amazon S3](https://aws.amazon.com/s3/). +S3 stands for Simple Storage Service and is an object storage service with a web service interface. + +## Artifacts + +sbt +: @@@vars + ```scala + libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "$version$" + ``` + @@@ + +Maven +: @@@vars + ```xml + + com.lightbend.akka + akka-stream-alpakka-s3_$scala.binaryVersion$ + $version$ + + ``` + @@@ + +Gradle +: @@@vars + ```gradle + dependencies { + compile group: "com.lightbend.akka", name: "akka-stream-alpakka-s3_$scala.binaryVersion$", version: "$version$" + } + ``` + @@@ + +## Usage + +### Set up your S3 clients + +The S3 connector can be configured within your `application.conf` file. + +Configuration +: @@snip (../../../../s3/src/main/resources/reference.conf) + +### Create an S3 client + +Scala +: @@snip (../../../../s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3ClientIntegrationSpec.scala) { #client } + +Java +: @@snip (../../../../s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java) { #client } + +### Storing a file in S3 + +Scala +: @@snip (../../../../s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala) { #upload } + +Java +: @@snip (../../../../s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java) { #upload } + +### Downloading a file from S3 + +Scala +: @@snip (../../../../s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SourceSpec.scala) { #download } + +Java +: @@snip (../../../../s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java) { #download } + +### Running the example code + +The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt. + +Scala +: ``` + sbt + > s3/test + ``` + +Java +: ``` + sbt + > s3/test + ``` diff --git a/s3/src/main/resources/reference.conf b/s3/src/main/resources/reference.conf index 720f9a3d59..68aa78f839 100644 --- a/s3/src/main/resources/reference.conf +++ b/s3/src/main/resources/reference.conf @@ -5,14 +5,14 @@ akka.stream.alpakka.s3 { # location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path. disk-buffer-path = "" - debug-logging = false - proxy { - # Hostname of the proxy. If undefined ("") proxy is not enabled. + # hostname of the proxy. If undefined ("") proxy is not enabled. host = "" port = 8000 } + # default values for AWS configuration. If credentials and/or region are not specified when creating S3Client, + # these values will be used. aws { access-key-id = "" secret-access-key = "" diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala b/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala index b139f42607..0899ce7dc3 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala @@ -3,7 +3,7 @@ */ package akka.stream.alpakka.s3 -import scala.xml.{Elem, NodeSeq, XML} +import scala.xml.{Elem, XML} class S3Exception(val code: String, val message: String, val requestID: String, val hostId: String) extends RuntimeException(message) { diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/S3Settings.scala b/s3/src/main/scala/akka/stream/alpakka/s3/S3Settings.scala index 1212d1ef2e..2a83d26ddd 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/S3Settings.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/S3Settings.scala @@ -11,13 +11,12 @@ final case class Proxy(host: String, port: Int) final class S3Settings(val bufferType: BufferType, val diskBufferPath: String, - val debugLogging: Boolean, val proxy: Option[Proxy], val awsCredentials: AWSCredentials, val s3Region: String) { override def toString: String = - s"S3Settings($bufferType,$diskBufferPath,$debugLogging,$proxy,$awsCredentials,$s3Region)" + s"S3Settings($bufferType,$diskBufferPath,$proxy,$awsCredentials,$s3Region)" } sealed trait BufferType @@ -44,7 +43,6 @@ object S3Settings { case _ => throw new IllegalArgumentException("Buffer type must be 'memory' or 'disk'") }, diskBufferPath = config.getString("disk-buffer-path"), - debugLogging = config.getBoolean("debug-logging"), proxy = { if (config.getString("proxy.host") != "") Some(Proxy(config.getString("proxy.host"), config.getInt("proxy.port"))) diff --git a/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java b/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java new file mode 100644 index 0000000000..1c81efe998 --- /dev/null +++ b/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.s3.javadsl; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.http.javadsl.model.Uri; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.alpakka.s3.auth.AWSCredentials; +import akka.stream.alpakka.s3.auth.BasicCredentials; +import akka.stream.alpakka.s3.scaladsl.S3WireMockBase; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.util.ByteString; +import org.junit.Test; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class S3ClientTest extends S3WireMockBase { + + final Materializer materializer = ActorMaterializer.create(system()); + + //#client + final AWSCredentials credentials = new BasicCredentials("my-AWS-access-key-ID", "my-AWS-password"); + final S3Client client = new S3Client(credentials, "us-east-1", system(), materializer); + //#client + + @Test + public void multipartUpload() throws Exception { + + mockUpload(); + + //#upload + final Sink> sink = client.multipartUpload(bucket(), bucketKey()); + //#upload + + final CompletionStage resultCompletionStage = + Source.single(ByteString.fromString(body())).runWith(sink, materializer); + + MultipartUploadResult result = resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS); + + assertEquals(new MultipartUploadResult(Uri.create(url()), bucket(), bucketKey(), etag()), result); + } + + @Test + public void download() throws Exception { + + mockDownload(); + + //#download + final Source source = client.download(bucket(), bucketKey()); + //#download + + final CompletionStage resultCompletionStage = + source.map(ByteString::utf8String).runWith(Sink.head(), materializer); + + String result = resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS); + + assertEquals(body(), result); + } +} diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala deleted file mode 100644 index 4e18fc5ecb..0000000000 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.stream.alpakka.s3.impl - -import akka.stream.ActorMaterializer -import akka.stream.alpakka.s3.S3Exception -import akka.stream.alpakka.s3.auth.AWSCredentials -import akka.stream.alpakka.s3.scaladsl.{MultipartUploadResult, S3Client} -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.util.ByteString -import com.github.tomakehurst.wiremock.client.WireMock._ - -import scala.concurrent.Await -import scala.concurrent.duration._ - -class S3SinkSpec extends WireMockBase { - - implicit val materializer = ActorMaterializer() - - "S3Sink" should "work in a happy case" in { - val body = "Some content" - val key = "testKey" - val bucket = "testBucket" - val uploadId = "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA" - val etag = "5b27a21a97fcf8a7004dd1d906e7a5ba" - val url = s"http://testbucket.s3.amazonaws.com/testKey" - mock - .register( - post(urlEqualTo(s"/$key?uploads")).willReturn( - aResponse() - .withStatus(200) - .withHeader("x-amz-id-2", "Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==") - .withHeader("x-amz-request-id", "656c76696e6727732072657175657374") - .withBody(s""" - | - | $bucket - | $key - | $uploadId - |""".stripMargin) - ) - ) - - mock.register( - put(urlEqualTo(s"/$key?partNumber=1&uploadId=$uploadId")) - .withRequestBody(matching(body)) - .willReturn( - aResponse() - .withStatus(200) - .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt") - .withHeader("x-amz-request-id", "5A37448A37622243") - .withHeader("ETag", "\"" + etag + "\"") - ) - ) - - mock.register( - post(urlEqualTo(s"/$key?uploadId=$uploadId")) - .withRequestBody(containing("CompleteMultipartUpload")) - .withRequestBody(containing(etag)) - .willReturn( - aResponse() - .withStatus(200) - .withHeader("Content-Type", "application/xml; charset=UTF-8") - .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt") - .withHeader("x-amz-request-id", "5A37448A3762224333") - .withBody(s""" - | - | $url - | $bucket - | $key - | "$etag" - |""".stripMargin) - ) - ) - - val result = Source(ByteString(body) :: Nil) - .toMat(new S3Client(AWSCredentials("", ""), "us-east-1").multipartUpload(bucket, key))(Keep.right) - .run - - Await.ready(result, 5.seconds).futureValue shouldBe MultipartUploadResult(url, bucket, key, etag) - } - - it should "fail if request returns 404" in { - val result = new S3Client(AWSCredentials("", ""), "us-east-1") - .download("sometest4398673", "30000184.xml") - .map(_.decodeString("utf8")) - .runWith(Sink.head) - whenReady(result.failed) { e => - e shouldBe a[S3Exception] - e.asInstanceOf[S3Exception].code should equal("NoSuchKey") - } - } -} diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala deleted file mode 100644 index fe3cf411eb..0000000000 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.stream.alpakka.s3.impl - -import akka.stream.ActorMaterializer -import akka.stream.alpakka.s3.S3Exception -import akka.stream.alpakka.s3.auth.AWSCredentials -import akka.stream.alpakka.s3.scaladsl.S3Client -import akka.stream.scaladsl.Sink -import com.github.tomakehurst.wiremock.client.WireMock._ - -import scala.concurrent.Await -import scala.concurrent.duration._ - -class S3SourceSpec extends WireMockBase { - implicit val mat = ActorMaterializer() - - "S3Source" should "work in a happy case" in { - val body = "Some content" - mock - .register( - get(urlEqualTo("/testKey")).willReturn( - aResponse().withStatus(200).withHeader("ETag", """"fba9dede5f27731c9771645a39863328"""").withBody(body) - ) - ) - - val result = new S3Client(AWSCredentials("", ""), "us-east-1") - .download("testBucket", "testKey") - .map(_.decodeString("utf8")) - .runWith(Sink.head) - - Await.ready(result, 5.seconds).futureValue shouldBe body - } - - it should "fail if request returns 404" in { - val result = new S3Client(AWSCredentials("", ""), "us-east-1") - .download("sometest4398673", "30000184.xml") - .map(_.decodeString("utf8")) - .runWith(Sink.head) - whenReady(result.failed) { e => - e shouldBe a[S3Exception] - e.asInstanceOf[S3Exception].code should equal("NoSuchKey") - } - } -} diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala deleted file mode 100644 index 9f17a9e562..0000000000 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.stream.alpakka.s3.impl - -import akka.actor.ActorSystem -import akka.stream.alpakka.s3.impl.WireMockBase._ -import akka.testkit.TestKit -import com.github.tomakehurst.wiremock.WireMockServer -import com.github.tomakehurst.wiremock.client.WireMock -import com.github.tomakehurst.wiremock.client.WireMock._ -import com.github.tomakehurst.wiremock.core.WireMockConfiguration._ -import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} - -import scala.concurrent.duration._ - -object WireMockBase { - def config(port: Int) = ConfigFactory.parseString( - s""" - |akka { - | loggers = ["akka.testkit.TestEventListener"] - | - | ssl-config.trustManager.stores = [ - | {type = "PEM", path = "./s3/src/test/resources/rootCA.crt"} - | ] - | stream.alpakka.s3.proxy { - | host = localhost - | port = $port - | } - |} - """.stripMargin - ) - - def getCallerName(clazz: Class[_]): String = { - val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1) - .dropWhile(_ matches "(java.lang.Thread|.*WireMockBase.?$)") - val reduced = s.lastIndexWhere(_ == clazz.getName) match { - case -1 ⇒ s - case z ⇒ s drop (z + 1) - } - reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") - } - - def initServer(): WireMockServer = { - val server = new WireMockServer( - wireMockConfig() - .dynamicPort() - .dynamicHttpsPort() - .keystorePath("./s3/src/test/resources/keystore.jks") - .keystorePassword("abcdefg") - ) - server.start() - server - } -} - -abstract class WireMockBase(_system: ActorSystem, _wireMockServer: WireMockServer) - extends TestKit(_system) - with FlatSpecLike - with BeforeAndAfterAll - with Matchers - with ScalaFutures { - - implicit val defaultPatience = - PatienceConfig(timeout = 5.seconds, interval = 100.millis) - - def this(mock: WireMockServer) = - this(ActorSystem(getCallerName(getClass), config(mock.httpsPort())), mock) - def this() = this(initServer()) - - val mock = new WireMock("localhost", _wireMockServer.port()) - - override def beforeAll(): Unit = - mock.register( - any(anyUrl()).willReturn( - aResponse() - .withStatus(404) - .withBody( - "NoSuchKeyNo key found" + - "XXXXXXXX" - ) - ) - ) - override def afterAll(): Unit = _wireMockServer.stop() - -} diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3ClientIntegrationSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3ClientIntegrationSpec.scala new file mode 100644 index 0000000000..13be55f52f --- /dev/null +++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3ClientIntegrationSpec.scala @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.s3.scaladsl + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.alpakka.s3.auth.AWSCredentials +import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} +import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} + +trait S3ClientIntegrationSpec + extends FlatSpecLike + with BeforeAndAfterAll + with Matchers + with ScalaFutures + with IntegrationPatience { + + implicit val system: ActorSystem + implicit val materializer = ActorMaterializer() + + //#client + val awsCredentials = AWSCredentials(accessKeyId = "my-AWS-access-key-ID", secretAccessKey = "my-AWS-password") + val s3Client = new S3Client(credentials = awsCredentials, region = "us-east-1")(system, materializer) + //#client + +} diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala similarity index 94% rename from s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala rename to s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala index c51925b71d..d027bfaae2 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala @@ -1,11 +1,11 @@ /* * Copyright (C) 2016-2017 Lightbend Inc. */ -package akka.stream.alpakka.s3.impl +package akka.stream.alpakka.s3.scaladsl import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import akka.stream.alpakka.s3.scaladsl.S3Client +import akka.stream.alpakka.s3.impl.MetaHeaders import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString import org.scalatest.concurrent.ScalaFutures diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala new file mode 100644 index 0000000000..d216e16d47 --- /dev/null +++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.s3.scaladsl + +import akka.stream.scaladsl.{Sink, Source} +import akka.util.ByteString + +import scala.concurrent.Future + +class S3SinkSpec extends S3WireMockBase with S3ClientIntegrationSpec { + + "S3Sink" should "upload a stream of bytes to S3" in { + + mockUpload() + + //#upload + val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = s3Client.multipartUpload(bucket, bucketKey) + //#upload + + val result: Future[MultipartUploadResult] = Source.single(ByteString(body)).runWith(s3Sink) + + result.futureValue shouldBe MultipartUploadResult(url, bucket, bucketKey, etag) + } + + it should "fail if request returns 404" in { + + mock404s() + + val result = Source + .single(ByteString("some contents")) + .runWith(s3Client.multipartUpload("nonexisting_bucket", "nonexisting_file.xml")) + + result.failed.futureValue.getMessage should startWith("Can't initiate upload:") + } + + override protected def afterAll(): Unit = { + super.afterAll() + stopWireMockServer() + } +} diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SourceSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SourceSpec.scala new file mode 100644 index 0000000000..4ef2da4412 --- /dev/null +++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SourceSpec.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.s3.scaladsl + +import akka.NotUsed +import akka.stream.alpakka.s3.S3Exception +import akka.stream.scaladsl.{Sink, Source} +import akka.util.ByteString + +import scala.concurrent.Future + +class S3SourceSpec extends S3WireMockBase with S3ClientIntegrationSpec { + + "S3Source" should "download a stream of bytes from S3" in { + + mockDownload() + + //#download + val s3Source: Source[ByteString, NotUsed] = s3Client.download("testBucket", "testKey") + //#download + + val result: Future[String] = s3Source.map(_.utf8String).runWith(Sink.head) + + result.futureValue shouldBe body + } + + it should "fail if request returns 404" in { + + mock404s() + + val result = s3Client + .download("nonexisting_bucket", "nonexisting_file.xml") + .map(_.utf8String) + .runWith(Sink.head) + + whenReady(result.failed) { e => + e shouldBe a[S3Exception] + e.asInstanceOf[S3Exception].code should equal("NoSuchKey") + } + } + + override protected def afterAll(): Unit = { + super.afterAll() + stopWireMockServer() + } +} diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3WireMockBase.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3WireMockBase.scala new file mode 100644 index 0000000000..5c50672123 --- /dev/null +++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3WireMockBase.scala @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.s3.scaladsl + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock +import com.github.tomakehurst.wiremock.client.WireMock._ +import com.github.tomakehurst.wiremock.core.WireMockConfiguration._ +import com.typesafe.config.ConfigFactory +import S3WireMockBase._ + +abstract class S3WireMockBase(_system: ActorSystem, _wireMockServer: WireMockServer) extends TestKit(_system) { + + def this(mock: WireMockServer) = this(ActorSystem(getCallerName(getClass), config(mock.httpsPort())), mock) + def this() = this(initServer()) + + val mock = new WireMock("localhost", _wireMockServer.port()) + + def mock404s(): Unit = + mock.register( + any(anyUrl()).willReturn( + aResponse() + .withStatus(404) + .withBody( + "NoSuchKeyNo key found" + + "XXXXXXXX" + ) + ) + ) + + def stopWireMockServer(): Unit = _wireMockServer.stop() + + val body = "Some content" + val bucketKey = "testKey" + val bucket = "testBucket" + val uploadId = "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA" + val etag = "5b27a21a97fcf8a7004dd1d906e7a5ba" + val url = s"http://testbucket.s3.amazonaws.com/testKey" + + def mockDownload(): Unit = + mock + .register( + get(urlEqualTo("/testKey")).willReturn( + aResponse().withStatus(200).withHeader("ETag", """"fba9dede5f27731c9771645a39863328"""").withBody(body) + ) + ) + + def mockUpload(): Unit = { + mock + .register( + post(urlEqualTo(s"/$bucketKey?uploads")).willReturn( + aResponse() + .withStatus(200) + .withHeader("x-amz-id-2", "Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==") + .withHeader("x-amz-request-id", "656c76696e6727732072657175657374") + .withBody(s""" + | + | $bucket + | $bucketKey + | $uploadId + |""".stripMargin) + ) + ) + + mock.register( + put(urlEqualTo(s"/$bucketKey?partNumber=1&uploadId=$uploadId")) + .withRequestBody(matching(body)) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt") + .withHeader("x-amz-request-id", "5A37448A37622243") + .withHeader("ETag", "\"" + etag + "\"") + ) + ) + + mock.register( + post(urlEqualTo(s"/$bucketKey?uploadId=$uploadId")) + .withRequestBody(containing("CompleteMultipartUpload")) + .withRequestBody(containing(etag)) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/xml; charset=UTF-8") + .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt") + .withHeader("x-amz-request-id", "5A37448A3762224333") + .withBody(s""" + | + | $url + | $bucket + | $bucketKey + | "$etag" + |""".stripMargin) + ) + ) + } +} + +private object S3WireMockBase { + def config(port: Int) = ConfigFactory.parseString( + s""" + |akka { + | loggers = ["akka.testkit.TestEventListener"] + | + | ssl-config.trustManager.stores = [ + | {type = "PEM", path = "./s3/src/test/resources/rootCA.crt"} + | ] + | stream.alpakka.s3.proxy { + | host = localhost + | port = $port + | } + |} + """.stripMargin + ) + + def getCallerName(clazz: Class[_]): String = { + val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1) + .dropWhile(_ matches "(java.lang.Thread|.*WireMockBase.?$)") + val reduced = s.lastIndexWhere(_ == clazz.getName) match { + case -1 ⇒ s + case z ⇒ s drop (z + 1) + } + reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") + } + + def initServer(): WireMockServer = { + val server = new WireMockServer( + wireMockConfig() + .dynamicPort() + .dynamicHttpsPort() + .keystorePath("./s3/src/test/resources/keystore.jks") + .keystorePassword("abcdefg") + ) + server.start() + server + } +}