diff --git a/docs/src/main/paradox/connectors.md b/docs/src/main/paradox/connectors.md
index 59f375cbe4..17eb4fc8e7 100644
--- a/docs/src/main/paradox/connectors.md
+++ b/docs/src/main/paradox/connectors.md
@@ -13,6 +13,7 @@
* [HBase Connectors](hbase.md)
* [JMS Connectors](jms.md)
* [MQTT Connector](mqtt.md)
+* [S3 Connector](s3.md)
* [Server-sent Events (SSE)](sse.md)
@@@
diff --git a/docs/src/main/paradox/s3.md b/docs/src/main/paradox/s3.md
new file mode 100644
index 0000000000..c86ba3e110
--- /dev/null
+++ b/docs/src/main/paradox/s3.md
@@ -0,0 +1,82 @@
+# S3 Connector
+
+The S3 connector provides Akka Stream sources and sinks to connect to [Amazon S3](https://aws.amazon.com/s3/).
+S3 stands for Simple Storage Service and is an object storage service with a web service interface.
+
+## Artifacts
+
+sbt
+: @@@vars
+ ```scala
+ libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "$version$"
+ ```
+ @@@
+
+Maven
+: @@@vars
+ ```xml
+
+ com.lightbend.akka
+ akka-stream-alpakka-s3_$scala.binaryVersion$
+ $version$
+
+ ```
+ @@@
+
+Gradle
+: @@@vars
+ ```gradle
+ dependencies {
+ compile group: "com.lightbend.akka", name: "akka-stream-alpakka-s3_$scala.binaryVersion$", version: "$version$"
+ }
+ ```
+ @@@
+
+## Usage
+
+### Set up your S3 clients
+
+The S3 connector can be configured within your `application.conf` file.
+
+Configuration
+: @@snip (../../../../s3/src/main/resources/reference.conf)
+
+### Create an S3 client
+
+Scala
+: @@snip (../../../../s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3ClientIntegrationSpec.scala) { #client }
+
+Java
+: @@snip (../../../../s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java) { #client }
+
+### Storing a file in S3
+
+Scala
+: @@snip (../../../../s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala) { #upload }
+
+Java
+: @@snip (../../../../s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java) { #upload }
+
+### Downloading a file from S3
+
+Scala
+: @@snip (../../../../s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SourceSpec.scala) { #download }
+
+Java
+: @@snip (../../../../s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java) { #download }
+
+### Running the example code
+
+The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
+
+Scala
+: ```
+ sbt
+ > s3/test
+ ```
+
+Java
+: ```
+ sbt
+ > s3/test
+ ```
diff --git a/s3/src/main/resources/reference.conf b/s3/src/main/resources/reference.conf
index 720f9a3d59..68aa78f839 100644
--- a/s3/src/main/resources/reference.conf
+++ b/s3/src/main/resources/reference.conf
@@ -5,14 +5,14 @@ akka.stream.alpakka.s3 {
# location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path.
disk-buffer-path = ""
- debug-logging = false
-
proxy {
- # Hostname of the proxy. If undefined ("") proxy is not enabled.
+ # hostname of the proxy. If undefined ("") proxy is not enabled.
host = ""
port = 8000
}
+ # default values for AWS configuration. If credentials and/or region are not specified when creating S3Client,
+ # these values will be used.
aws {
access-key-id = ""
secret-access-key = ""
diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala b/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala
index b139f42607..0899ce7dc3 100644
--- a/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala
+++ b/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala
@@ -3,7 +3,7 @@
*/
package akka.stream.alpakka.s3
-import scala.xml.{Elem, NodeSeq, XML}
+import scala.xml.{Elem, XML}
class S3Exception(val code: String, val message: String, val requestID: String, val hostId: String)
extends RuntimeException(message) {
diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/S3Settings.scala b/s3/src/main/scala/akka/stream/alpakka/s3/S3Settings.scala
index 1212d1ef2e..2a83d26ddd 100644
--- a/s3/src/main/scala/akka/stream/alpakka/s3/S3Settings.scala
+++ b/s3/src/main/scala/akka/stream/alpakka/s3/S3Settings.scala
@@ -11,13 +11,12 @@ final case class Proxy(host: String, port: Int)
final class S3Settings(val bufferType: BufferType,
val diskBufferPath: String,
- val debugLogging: Boolean,
val proxy: Option[Proxy],
val awsCredentials: AWSCredentials,
val s3Region: String) {
override def toString: String =
- s"S3Settings($bufferType,$diskBufferPath,$debugLogging,$proxy,$awsCredentials,$s3Region)"
+ s"S3Settings($bufferType,$diskBufferPath,$proxy,$awsCredentials,$s3Region)"
}
sealed trait BufferType
@@ -44,7 +43,6 @@ object S3Settings {
case _ => throw new IllegalArgumentException("Buffer type must be 'memory' or 'disk'")
},
diskBufferPath = config.getString("disk-buffer-path"),
- debugLogging = config.getBoolean("debug-logging"),
proxy = {
if (config.getString("proxy.host") != "")
Some(Proxy(config.getString("proxy.host"), config.getInt("proxy.port")))
diff --git a/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java b/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java
new file mode 100644
index 0000000000..1c81efe998
--- /dev/null
+++ b/s3/src/test/java/akka/stream/alpakka/s3/javadsl/S3ClientTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.s3.javadsl;
+
+import akka.NotUsed;
+import akka.actor.ActorSystem;
+import akka.http.javadsl.model.Uri;
+import akka.stream.ActorMaterializer;
+import akka.stream.Materializer;
+import akka.stream.alpakka.s3.auth.AWSCredentials;
+import akka.stream.alpakka.s3.auth.BasicCredentials;
+import akka.stream.alpakka.s3.scaladsl.S3WireMockBase;
+import akka.stream.javadsl.Sink;
+import akka.stream.javadsl.Source;
+import akka.util.ByteString;
+import org.junit.Test;
+
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class S3ClientTest extends S3WireMockBase {
+
+ final Materializer materializer = ActorMaterializer.create(system());
+
+ //#client
+ final AWSCredentials credentials = new BasicCredentials("my-AWS-access-key-ID", "my-AWS-password");
+ final S3Client client = new S3Client(credentials, "us-east-1", system(), materializer);
+ //#client
+
+ @Test
+ public void multipartUpload() throws Exception {
+
+ mockUpload();
+
+ //#upload
+ final Sink> sink = client.multipartUpload(bucket(), bucketKey());
+ //#upload
+
+ final CompletionStage resultCompletionStage =
+ Source.single(ByteString.fromString(body())).runWith(sink, materializer);
+
+ MultipartUploadResult result = resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ assertEquals(new MultipartUploadResult(Uri.create(url()), bucket(), bucketKey(), etag()), result);
+ }
+
+ @Test
+ public void download() throws Exception {
+
+ mockDownload();
+
+ //#download
+ final Source source = client.download(bucket(), bucketKey());
+ //#download
+
+ final CompletionStage resultCompletionStage =
+ source.map(ByteString::utf8String).runWith(Sink.head(), materializer);
+
+ String result = resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ assertEquals(body(), result);
+ }
+}
diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala
deleted file mode 100644
index 4e18fc5ecb..0000000000
--- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright (C) 2016-2017 Lightbend Inc.
- */
-package akka.stream.alpakka.s3.impl
-
-import akka.stream.ActorMaterializer
-import akka.stream.alpakka.s3.S3Exception
-import akka.stream.alpakka.s3.auth.AWSCredentials
-import akka.stream.alpakka.s3.scaladsl.{MultipartUploadResult, S3Client}
-import akka.stream.scaladsl.{Keep, Sink, Source}
-import akka.util.ByteString
-import com.github.tomakehurst.wiremock.client.WireMock._
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-class S3SinkSpec extends WireMockBase {
-
- implicit val materializer = ActorMaterializer()
-
- "S3Sink" should "work in a happy case" in {
- val body = "Some content"
- val key = "testKey"
- val bucket = "testBucket"
- val uploadId = "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA"
- val etag = "5b27a21a97fcf8a7004dd1d906e7a5ba"
- val url = s"http://testbucket.s3.amazonaws.com/testKey"
- mock
- .register(
- post(urlEqualTo(s"/$key?uploads")).willReturn(
- aResponse()
- .withStatus(200)
- .withHeader("x-amz-id-2", "Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==")
- .withHeader("x-amz-request-id", "656c76696e6727732072657175657374")
- .withBody(s"""
- |
- | $bucket
- | $key
- | $uploadId
- |""".stripMargin)
- )
- )
-
- mock.register(
- put(urlEqualTo(s"/$key?partNumber=1&uploadId=$uploadId"))
- .withRequestBody(matching(body))
- .willReturn(
- aResponse()
- .withStatus(200)
- .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt")
- .withHeader("x-amz-request-id", "5A37448A37622243")
- .withHeader("ETag", "\"" + etag + "\"")
- )
- )
-
- mock.register(
- post(urlEqualTo(s"/$key?uploadId=$uploadId"))
- .withRequestBody(containing("CompleteMultipartUpload"))
- .withRequestBody(containing(etag))
- .willReturn(
- aResponse()
- .withStatus(200)
- .withHeader("Content-Type", "application/xml; charset=UTF-8")
- .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt")
- .withHeader("x-amz-request-id", "5A37448A3762224333")
- .withBody(s"""
- |
- | $url
- | $bucket
- | $key
- | "$etag"
- |""".stripMargin)
- )
- )
-
- val result = Source(ByteString(body) :: Nil)
- .toMat(new S3Client(AWSCredentials("", ""), "us-east-1").multipartUpload(bucket, key))(Keep.right)
- .run
-
- Await.ready(result, 5.seconds).futureValue shouldBe MultipartUploadResult(url, bucket, key, etag)
- }
-
- it should "fail if request returns 404" in {
- val result = new S3Client(AWSCredentials("", ""), "us-east-1")
- .download("sometest4398673", "30000184.xml")
- .map(_.decodeString("utf8"))
- .runWith(Sink.head)
- whenReady(result.failed) { e =>
- e shouldBe a[S3Exception]
- e.asInstanceOf[S3Exception].code should equal("NoSuchKey")
- }
- }
-}
diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala
deleted file mode 100644
index fe3cf411eb..0000000000
--- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (C) 2016-2017 Lightbend Inc.
- */
-package akka.stream.alpakka.s3.impl
-
-import akka.stream.ActorMaterializer
-import akka.stream.alpakka.s3.S3Exception
-import akka.stream.alpakka.s3.auth.AWSCredentials
-import akka.stream.alpakka.s3.scaladsl.S3Client
-import akka.stream.scaladsl.Sink
-import com.github.tomakehurst.wiremock.client.WireMock._
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-class S3SourceSpec extends WireMockBase {
- implicit val mat = ActorMaterializer()
-
- "S3Source" should "work in a happy case" in {
- val body = "Some content"
- mock
- .register(
- get(urlEqualTo("/testKey")).willReturn(
- aResponse().withStatus(200).withHeader("ETag", """"fba9dede5f27731c9771645a39863328"""").withBody(body)
- )
- )
-
- val result = new S3Client(AWSCredentials("", ""), "us-east-1")
- .download("testBucket", "testKey")
- .map(_.decodeString("utf8"))
- .runWith(Sink.head)
-
- Await.ready(result, 5.seconds).futureValue shouldBe body
- }
-
- it should "fail if request returns 404" in {
- val result = new S3Client(AWSCredentials("", ""), "us-east-1")
- .download("sometest4398673", "30000184.xml")
- .map(_.decodeString("utf8"))
- .runWith(Sink.head)
- whenReady(result.failed) { e =>
- e shouldBe a[S3Exception]
- e.asInstanceOf[S3Exception].code should equal("NoSuchKey")
- }
- }
-}
diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala
deleted file mode 100644
index 9f17a9e562..0000000000
--- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright (C) 2016-2017 Lightbend Inc.
- */
-package akka.stream.alpakka.s3.impl
-
-import akka.actor.ActorSystem
-import akka.stream.alpakka.s3.impl.WireMockBase._
-import akka.testkit.TestKit
-import com.github.tomakehurst.wiremock.WireMockServer
-import com.github.tomakehurst.wiremock.client.WireMock
-import com.github.tomakehurst.wiremock.client.WireMock._
-import com.github.tomakehurst.wiremock.core.WireMockConfiguration._
-import com.typesafe.config.ConfigFactory
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
-
-import scala.concurrent.duration._
-
-object WireMockBase {
- def config(port: Int) = ConfigFactory.parseString(
- s"""
- |akka {
- | loggers = ["akka.testkit.TestEventListener"]
- |
- | ssl-config.trustManager.stores = [
- | {type = "PEM", path = "./s3/src/test/resources/rootCA.crt"}
- | ]
- | stream.alpakka.s3.proxy {
- | host = localhost
- | port = $port
- | }
- |}
- """.stripMargin
- )
-
- def getCallerName(clazz: Class[_]): String = {
- val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1)
- .dropWhile(_ matches "(java.lang.Thread|.*WireMockBase.?$)")
- val reduced = s.lastIndexWhere(_ == clazz.getName) match {
- case -1 ⇒ s
- case z ⇒ s drop (z + 1)
- }
- reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_")
- }
-
- def initServer(): WireMockServer = {
- val server = new WireMockServer(
- wireMockConfig()
- .dynamicPort()
- .dynamicHttpsPort()
- .keystorePath("./s3/src/test/resources/keystore.jks")
- .keystorePassword("abcdefg")
- )
- server.start()
- server
- }
-}
-
-abstract class WireMockBase(_system: ActorSystem, _wireMockServer: WireMockServer)
- extends TestKit(_system)
- with FlatSpecLike
- with BeforeAndAfterAll
- with Matchers
- with ScalaFutures {
-
- implicit val defaultPatience =
- PatienceConfig(timeout = 5.seconds, interval = 100.millis)
-
- def this(mock: WireMockServer) =
- this(ActorSystem(getCallerName(getClass), config(mock.httpsPort())), mock)
- def this() = this(initServer())
-
- val mock = new WireMock("localhost", _wireMockServer.port())
-
- override def beforeAll(): Unit =
- mock.register(
- any(anyUrl()).willReturn(
- aResponse()
- .withStatus(404)
- .withBody(
- "NoSuchKey
No key found" +
- "XXXXXXXX"
- )
- )
- )
- override def afterAll(): Unit = _wireMockServer.stop()
-
-}
diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3ClientIntegrationSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3ClientIntegrationSpec.scala
new file mode 100644
index 0000000000..13be55f52f
--- /dev/null
+++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3ClientIntegrationSpec.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.s3.scaladsl
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.stream.alpakka.s3.auth.AWSCredentials
+import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+
+trait S3ClientIntegrationSpec
+ extends FlatSpecLike
+ with BeforeAndAfterAll
+ with Matchers
+ with ScalaFutures
+ with IntegrationPatience {
+
+ implicit val system: ActorSystem
+ implicit val materializer = ActorMaterializer()
+
+ //#client
+ val awsCredentials = AWSCredentials(accessKeyId = "my-AWS-access-key-ID", secretAccessKey = "my-AWS-password")
+ val s3Client = new S3Client(credentials = awsCredentials, region = "us-east-1")(system, materializer)
+ //#client
+
+}
diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala
similarity index 94%
rename from s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala
rename to s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala
index c51925b71d..d027bfaae2 100644
--- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala
+++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3NoMock.scala
@@ -1,11 +1,11 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc.
*/
-package akka.stream.alpakka.s3.impl
+package akka.stream.alpakka.s3.scaladsl
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
-import akka.stream.alpakka.s3.scaladsl.S3Client
+import akka.stream.alpakka.s3.impl.MetaHeaders
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala
new file mode 100644
index 0000000000..d216e16d47
--- /dev/null
+++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SinkSpec.scala
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.s3.scaladsl
+
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+
+import scala.concurrent.Future
+
+class S3SinkSpec extends S3WireMockBase with S3ClientIntegrationSpec {
+
+ "S3Sink" should "upload a stream of bytes to S3" in {
+
+ mockUpload()
+
+ //#upload
+ val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = s3Client.multipartUpload(bucket, bucketKey)
+ //#upload
+
+ val result: Future[MultipartUploadResult] = Source.single(ByteString(body)).runWith(s3Sink)
+
+ result.futureValue shouldBe MultipartUploadResult(url, bucket, bucketKey, etag)
+ }
+
+ it should "fail if request returns 404" in {
+
+ mock404s()
+
+ val result = Source
+ .single(ByteString("some contents"))
+ .runWith(s3Client.multipartUpload("nonexisting_bucket", "nonexisting_file.xml"))
+
+ result.failed.futureValue.getMessage should startWith("Can't initiate upload:")
+ }
+
+ override protected def afterAll(): Unit = {
+ super.afterAll()
+ stopWireMockServer()
+ }
+}
diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SourceSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SourceSpec.scala
new file mode 100644
index 0000000000..4ef2da4412
--- /dev/null
+++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3SourceSpec.scala
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.s3.scaladsl
+
+import akka.NotUsed
+import akka.stream.alpakka.s3.S3Exception
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+
+import scala.concurrent.Future
+
+class S3SourceSpec extends S3WireMockBase with S3ClientIntegrationSpec {
+
+ "S3Source" should "download a stream of bytes from S3" in {
+
+ mockDownload()
+
+ //#download
+ val s3Source: Source[ByteString, NotUsed] = s3Client.download("testBucket", "testKey")
+ //#download
+
+ val result: Future[String] = s3Source.map(_.utf8String).runWith(Sink.head)
+
+ result.futureValue shouldBe body
+ }
+
+ it should "fail if request returns 404" in {
+
+ mock404s()
+
+ val result = s3Client
+ .download("nonexisting_bucket", "nonexisting_file.xml")
+ .map(_.utf8String)
+ .runWith(Sink.head)
+
+ whenReady(result.failed) { e =>
+ e shouldBe a[S3Exception]
+ e.asInstanceOf[S3Exception].code should equal("NoSuchKey")
+ }
+ }
+
+ override protected def afterAll(): Unit = {
+ super.afterAll()
+ stopWireMockServer()
+ }
+}
diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3WireMockBase.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3WireMockBase.scala
new file mode 100644
index 0000000000..5c50672123
--- /dev/null
+++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3WireMockBase.scala
@@ -0,0 +1,140 @@
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc.
+ */
+package akka.stream.alpakka.s3.scaladsl
+
+import akka.actor.ActorSystem
+import akka.testkit.TestKit
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.client.WireMock._
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration._
+import com.typesafe.config.ConfigFactory
+import S3WireMockBase._
+
+abstract class S3WireMockBase(_system: ActorSystem, _wireMockServer: WireMockServer) extends TestKit(_system) {
+
+ def this(mock: WireMockServer) = this(ActorSystem(getCallerName(getClass), config(mock.httpsPort())), mock)
+ def this() = this(initServer())
+
+ val mock = new WireMock("localhost", _wireMockServer.port())
+
+ def mock404s(): Unit =
+ mock.register(
+ any(anyUrl()).willReturn(
+ aResponse()
+ .withStatus(404)
+ .withBody(
+ "NoSuchKey
No key found" +
+ "XXXXXXXX"
+ )
+ )
+ )
+
+ def stopWireMockServer(): Unit = _wireMockServer.stop()
+
+ val body = "Some content"
+ val bucketKey = "testKey"
+ val bucket = "testBucket"
+ val uploadId = "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA"
+ val etag = "5b27a21a97fcf8a7004dd1d906e7a5ba"
+ val url = s"http://testbucket.s3.amazonaws.com/testKey"
+
+ def mockDownload(): Unit =
+ mock
+ .register(
+ get(urlEqualTo("/testKey")).willReturn(
+ aResponse().withStatus(200).withHeader("ETag", """"fba9dede5f27731c9771645a39863328"""").withBody(body)
+ )
+ )
+
+ def mockUpload(): Unit = {
+ mock
+ .register(
+ post(urlEqualTo(s"/$bucketKey?uploads")).willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("x-amz-id-2", "Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==")
+ .withHeader("x-amz-request-id", "656c76696e6727732072657175657374")
+ .withBody(s"""
+ |
+ | $bucket
+ | $bucketKey
+ | $uploadId
+ |""".stripMargin)
+ )
+ )
+
+ mock.register(
+ put(urlEqualTo(s"/$bucketKey?partNumber=1&uploadId=$uploadId"))
+ .withRequestBody(matching(body))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt")
+ .withHeader("x-amz-request-id", "5A37448A37622243")
+ .withHeader("ETag", "\"" + etag + "\"")
+ )
+ )
+
+ mock.register(
+ post(urlEqualTo(s"/$bucketKey?uploadId=$uploadId"))
+ .withRequestBody(containing("CompleteMultipartUpload"))
+ .withRequestBody(containing(etag))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/xml; charset=UTF-8")
+ .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt")
+ .withHeader("x-amz-request-id", "5A37448A3762224333")
+ .withBody(s"""
+ |
+ | $url
+ | $bucket
+ | $bucketKey
+ | "$etag"
+ |""".stripMargin)
+ )
+ )
+ }
+}
+
+private object S3WireMockBase {
+ def config(port: Int) = ConfigFactory.parseString(
+ s"""
+ |akka {
+ | loggers = ["akka.testkit.TestEventListener"]
+ |
+ | ssl-config.trustManager.stores = [
+ | {type = "PEM", path = "./s3/src/test/resources/rootCA.crt"}
+ | ]
+ | stream.alpakka.s3.proxy {
+ | host = localhost
+ | port = $port
+ | }
+ |}
+ """.stripMargin
+ )
+
+ def getCallerName(clazz: Class[_]): String = {
+ val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1)
+ .dropWhile(_ matches "(java.lang.Thread|.*WireMockBase.?$)")
+ val reduced = s.lastIndexWhere(_ == clazz.getName) match {
+ case -1 ⇒ s
+ case z ⇒ s drop (z + 1)
+ }
+ reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_")
+ }
+
+ def initServer(): WireMockServer = {
+ val server = new WireMockServer(
+ wireMockConfig()
+ .dynamicPort()
+ .dynamicHttpsPort()
+ .keystorePath("./s3/src/test/resources/keystore.jks")
+ .keystorePassword("abcdefg")
+ )
+ server.start()
+ server
+ }
+}