From 2638b978e637e743b5240f1d33c1922e60c1ff67 Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Thu, 23 Mar 2017 17:13:11 +0100 Subject: [PATCH 1/3] Upgrade to scalafmt 0.6.6 --- project/AutomateScalafmtPlugin.scala | 66 ++++++++++++++++++++++++++++ project/Common.scala | 10 +---- project/plugins.sbt | 2 +- 3 files changed, 69 insertions(+), 9 deletions(-) create mode 100644 project/AutomateScalafmtPlugin.scala diff --git a/project/AutomateScalafmtPlugin.scala b/project/AutomateScalafmtPlugin.scala new file mode 100644 index 0000000000..92fd54f025 --- /dev/null +++ b/project/AutomateScalafmtPlugin.scala @@ -0,0 +1,66 @@ +import org.scalafmt.bootstrap.ScalafmtBootstrap +import org.scalafmt.sbt.ScalafmtPlugin +import sbt._ +import sbt.Keys._ +import sbt.inc.Analysis + +object AutomateScalafmtPlugin extends AutoPlugin { + + object autoImport { + def automateScalafmtFor(configurations: Configuration*): Seq[Setting[_]] = + configurations.flatMap { c => + inConfig(c)( + Seq( + compileInputs.in(compile) := { + scalafmtInc.value + compileInputs.in(compile).value + }, + sourceDirectories.in(scalafmtInc) := Seq(scalaSource.value), + scalafmtInc := { + val cache = streams.value.cacheDirectory / "scalafmt" + val include = includeFilter.in(scalafmtInc).value + val exclude = excludeFilter.in(scalafmtInc).value + val sources = + sourceDirectories + .in(scalafmtInc) + .value + .descendantsExcept(include, exclude) + .get + .toSet + def format(handler: Set[File] => Unit, msg: String) = { + def update(handler: Set[File] => Unit, msg: String)(in: ChangeReport[File], + out: ChangeReport[File]) = { + val label = Reference.display(thisProjectRef.value) + val files = in.modified -- in.removed + Analysis + .counted("Scala source", "", "s", files.size) + .foreach(count => streams.value.log.info(s"$msg $count in $label ...")) + handler(files) + files + } + FileFunction.cached(cache)(FilesInfo.hash, FilesInfo.exists)(update(handler, msg))( + sources + ) + } + def formattingHandler(files: Set[File]) = + if (files.nonEmpty) { + val filesArg = files.map(_.getAbsolutePath).mkString(",") + ScalafmtBootstrap.main(List("--quiet", "-i", "-f", filesArg)) + } + format(formattingHandler, "Formatting") + format(_ => (), "Reformatted") // Recalculate the cache + } + ) + ) + } + } + + private val scalafmtInc = taskKey[Unit]("Incrementally format modified sources") + + override def requires = ScalafmtPlugin + + override def trigger = allRequirements + + override def projectSettings = + (includeFilter.in(scalafmtInc) := "*.scala") +: autoImport.automateScalafmtFor(Compile, Test) +} diff --git a/project/Common.scala b/project/Common.scala index f15dc5ff13..7c017438c6 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -3,8 +3,6 @@ import sbt.Keys._ import sbt.plugins.JvmPlugin import de.heikoseeberger.sbtheader._ import de.heikoseeberger.sbtheader.HeaderPlugin.autoImport._ -import org.scalafmt.sbt.ScalaFmtPlugin -import org.scalafmt.sbt.ScalaFmtPlugin.autoImport._ object Common extends AutoPlugin { @@ -18,7 +16,7 @@ object Common extends AutoPlugin { override def requires = JvmPlugin && HeaderPlugin - override lazy val projectSettings = reformatOnCompileSettings ++ + override lazy val projectSettings = Dependencies.Common ++ Seq( organization := "com.lightbend.akka", organizationName := "Lightbend Inc.", @@ -61,10 +59,6 @@ object Common extends AutoPlugin { headers := headers.value ++ Map( "scala" -> FileHeader, "java" -> FileHeader - ), - - formatSbtFiles := false, - scalafmtConfig := Some(baseDirectory.in(ThisBuild).value / ".scalafmt.conf"), - ivyScala := ivyScala.value.map(_.copy(overrideScalaVersion = sbtPlugin.value)) // TODO Remove once this workaround no longer needed (https://github.com/sbt/sbt/issues/2786)! + ) ) } diff --git a/project/plugins.sbt b/project/plugins.sbt index 199caa443a..fe977a7c7e 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ addSbtPlugin("de.heikoseeberger" % "sbt-header" % "1.6.0") -addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "0.4.10") +addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "0.6.6") addSbtPlugin("com.dwijnand" % "sbt-dynver" % "1.1.1") addSbtPlugin("com.lightbend.paradox" % "sbt-paradox" % "0.2.9") addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3") From 826c6111b6eaaf50665f2333074a442f6afa206f Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Thu, 23 Mar 2017 17:49:46 +0100 Subject: [PATCH 2/3] Remove deperecated binPack.callSite scalafmt setting --- .scalafmt.conf | 1 - 1 file changed, 1 deletion(-) diff --git a/.scalafmt.conf b/.scalafmt.conf index 477b081967..25cbefcfa8 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -7,4 +7,3 @@ indentOperator = spray maxColumn = 120 rewrite.rules = [RedundantBraces, RedundantParens, SortImports] unindentTopLevelOperators = true -binPack.callSite = true From 6c0faf96731b20df033ce24e29c5a94fc2c5bccc Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Wed, 29 Mar 2017 15:58:41 +0200 Subject: [PATCH 3/3] Format with updated scalafmt and fixed settings --- .../stream/alpakka/amqp/AmqpSinkStage.scala | 28 ++++--- .../stream/alpakka/amqp/AmqpSourceStage.scala | 3 +- .../amqp/scaladsl/AmqpConnectorsSpec.scala | 12 +-- .../awslambda/scaladsl/AwsLambdaFlow.scala | 5 +- .../scaladsl/AwsLambdaFlowSpec.scala | 13 +-- .../cassandra/CassandraSourceStage.scala | 26 +++--- .../cassandra/scaladsl/CassandraSink.scala | 10 +-- .../alpakka/dynamodb/impl/AwsClient.scala | 7 +- .../dynamodb/impl/DynamoClientImpl.scala | 8 +- .../dynamodb/impl/DynamoProtocol.scala | 50 ++++++------ .../stream/alpakka/dynamodb/TestOps.scala | 16 ++-- .../ftp/impl/FtpBrowserGraphStage.scala | 42 +++++----- .../alpakka/ftp/impl/FtpIOGraphStage.scala | 80 ++++++++++--------- .../pubsub/GooglePubSubSource.scala | 40 +++++----- .../alpakka/googlecloud/pubsub/HttpApi.scala | 9 ++- .../pubsub/javadsl/GooglePubSub.scala | 23 ++++-- .../pubsub/scaladsl/GooglePubSub.scala | 37 ++++++--- .../googlecloud/pubsub/ExampleUsage.scala | 30 ++++--- .../googlecloud/pubsub/GooglePubSubSpec.scala | 58 ++++++++++---- .../googlecloud/pubsub/HttpApiSpec.scala | 75 +++++++++++------ .../googlecloud/pubsub/SessionSpec.scala | 17 ++-- .../hbase/internal/HBaseCapabilities.scala | 3 +- .../hbase/internal/HBaseFlowStage.scala | 6 +- .../stream/alpakka/jms/JmsSinkStage.scala | 16 ++-- .../stream/alpakka/jms/JmsSourceStage.scala | 6 +- .../alpakka/mqtt/MqttProducerStage.scala | 25 +++--- .../stream/alpakka/mqtt/MqttSourceStage.scala | 3 +- .../alpakka/mqtt/scaladsl/MqttSinkSpec.scala | 11 ++- .../mqtt/scaladsl/MqttSourceSpec.scala | 31 ++++--- .../akka/stream/alpakka/s3/S3Exception.scala | 6 +- .../akka/stream/alpakka/s3/auth/Signer.scala | 5 +- .../stream/alpakka/s3/impl/HttpRequests.scala | 3 +- .../stream/alpakka/s3/impl/S3Stream.scala | 44 +++++----- .../stream/alpakka/s3/auth/SignerSpec.scala | 9 ++- .../alpakka/s3/auth/SigningKeySpec.scala | 3 +- .../alpakka/s3/impl/DiskBufferSpec.scala | 6 +- .../alpakka/s3/impl/MemoryBufferSpec.scala | 6 +- .../stream/alpakka/s3/impl/S3SinkSpec.scala | 45 ++++++++--- .../stream/alpakka/s3/impl/S3SourceSpec.scala | 18 +++-- .../alpakka/s3/impl/SplitAfterSizeSpec.scala | 9 ++- .../stream/alpakka/s3/impl/WireMockBase.scala | 29 ++++--- .../recordio/scaladsl/RecordIOFraming.scala | 6 +- .../recordio/RecordIOFramingSpec.scala | 45 ++++++----- .../stream/alpakka/sqs/SqsAckSinkStage.scala | 58 +++++++------- .../stream/alpakka/sqs/SqsSinkStage.scala | 36 +++++---- .../stream/alpakka/sqs/SqsSourceStage.scala | 38 +++++---- .../alpakka/sqs/scaladsl/SqsAckSink.scala | 3 +- .../stream/alpakka/sqs/scaladsl/SqsSink.scala | 3 +- .../alpakka/sse/scaladsl/EventSource.scala | 3 +- 49 files changed, 627 insertions(+), 438 deletions(-) diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala index 6fa3d58aa8..4478782675 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala @@ -57,21 +57,23 @@ final class AmqpSinkStage(settings: AmqpSinkSettings) pull(in) } - setHandler(in, + setHandler( + in, new InHandler { - override def onPush(): Unit = { - val elem = grab(in) - channel.basicPublish( - exchange, - routingKey, - elem.mandatory, - elem.immediate, - elem.props.orNull, - elem.bytes.toArray - ) - pull(in) + override def onPush(): Unit = { + val elem = grab(in) + channel.basicPublish( + exchange, + routingKey, + elem.mandatory, + elem.immediate, + elem.props.orNull, + elem.bytes.toArray + ) + pull(in) + } } - }) + ) } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSourceStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSourceStage.scala index 18115e2ca6..c57de2bd29 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSourceStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSourceStage.scala @@ -110,8 +110,7 @@ final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSize: Int) } } - setHandler(out, - new OutHandler { + setHandler(out, new OutHandler { override def onPull(): Unit = if (queue.nonEmpty) { pushAndAckMessage(queue.dequeue()) diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala index 1b135a68cb..0ca62538f0 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala @@ -69,10 +69,11 @@ class AmqpConnectorsSpec extends AmqpSpec { val merge = b.add(Merge[IncomingMessage](count)) for (n <- 0 until count) { val source = b.add( - AmqpSource( - NamedQueueSourceSettings(DefaultAmqpConnection, queueName).withDeclarations(queueDeclaration), - bufferSize = 1 - )) + AmqpSource( + NamedQueueSourceSettings(DefaultAmqpConnection, queueName).withDeclarations(queueDeclaration), + bufferSize = 1 + ) + ) source.out ~> merge.in(n) } @@ -232,7 +233,8 @@ class AmqpConnectorsSpec extends AmqpSpec { import system.dispatcher system.scheduler.scheduleOnce(5.seconds)( - completion.tryFailure(new Error("Did not get at least one element from every fanout branch"))) + completion.tryFailure(new Error("Did not get at least one element from every fanout branch")) + ) Source.repeat("stuff").map(s => ByteString(s)).runWith(amqpSink) diff --git a/awslambda/src/main/scala/akka/stream/alpakka/awslambda/scaladsl/AwsLambdaFlow.scala b/awslambda/src/main/scala/akka/stream/alpakka/awslambda/scaladsl/AwsLambdaFlow.scala index addab18785..743ef5dd50 100644 --- a/awslambda/src/main/scala/akka/stream/alpakka/awslambda/scaladsl/AwsLambdaFlow.scala +++ b/awslambda/src/main/scala/akka/stream/alpakka/awslambda/scaladsl/AwsLambdaFlow.scala @@ -14,8 +14,9 @@ object AwsLambdaFlow { /** * Scala API: creates a [[AwsLambdaFlowStage]] for a AWS Lambda function invocation using [[AWSLambdaClient]] */ - def apply(parallelism: Int)( - implicit awsLambdaClient: AWSLambdaAsyncClient): Flow[InvokeRequest, InvokeResult, NotUsed] = + def apply( + parallelism: Int + )(implicit awsLambdaClient: AWSLambdaAsyncClient): Flow[InvokeRequest, InvokeResult, NotUsed] = Flow.fromGraph(new AwsLambdaFlowStage(awsLambdaClient)(parallelism)) } diff --git a/awslambda/src/test/scala/akka/stream/alpakka/awslambda/scaladsl/AwsLambdaFlowSpec.scala b/awslambda/src/test/scala/akka/stream/alpakka/awslambda/scaladsl/AwsLambdaFlowSpec.scala index 08f64c08dc..85346b3bea 100644 --- a/awslambda/src/test/scala/akka/stream/alpakka/awslambda/scaladsl/AwsLambdaFlowSpec.scala +++ b/awslambda/src/test/scala/akka/stream/alpakka/awslambda/scaladsl/AwsLambdaFlowSpec.scala @@ -53,8 +53,9 @@ class AwsLambdaFlowSpec "call a single invoke request" in { - when(awsLambdaClient.invokeAsync(mockitoEq(invokeRequest), - mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]())).thenAnswer(new Answer[AnyRef] { + when( + awsLambdaClient.invokeAsync(mockitoEq(invokeRequest), mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]()) + ).thenAnswer(new Answer[AnyRef] { override def answer(invocation: InvocationOnMock): AnyRef = { invocation.getArgument[AsyncHandler[InvokeRequest, InvokeResult]](1).onSuccess(invokeRequest, invokeResult) CompletableFuture.completedFuture(invokeResult) @@ -67,14 +68,16 @@ class AwsLambdaFlowSpec future.map(_ shouldBe invokeResult) verify(awsLambdaClient, times(1)).invokeAsync(mockitoEq(invokeRequest), - mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]()) + mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]()) } "call with exception" in { - when(awsLambdaClient.invokeAsync(mockitoAny[InvokeRequest](), - mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]())).thenAnswer(new Answer[Future[InvokeResult]] { + when( + awsLambdaClient.invokeAsync(mockitoAny[InvokeRequest](), + mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]()) + ).thenAnswer(new Answer[Future[InvokeResult]] { override def answer(invocation: InvocationOnMock): Future[InvokeResult] = { val exception = new RuntimeException("Error in lambda") invocation.getArgument[AsyncHandler[InvokeRequest, InvokeResult]](1).onError(exception) diff --git a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/CassandraSourceStage.scala b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/CassandraSourceStage.scala index d642d7b075..9e7cad5cdd 100644 --- a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/CassandraSourceStage.scala +++ b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/CassandraSourceStage.scala @@ -30,22 +30,24 @@ final class CassandraSourceStage(futStmt: Future[Statement], session: Session) e futRs.onComplete(futFetchedCallback.invoke) } - setHandler(out, + setHandler( + out, new OutHandler { - override def onPull(): Unit = { - implicit val ec = materializer.executionContext + override def onPull(): Unit = { + implicit val ec = materializer.executionContext - maybeRs match { - case Some(rs) if rs.getAvailableWithoutFetching > 0 => push(out, rs.one()) - case Some(rs) if rs.isExhausted => completeStage() - case Some(rs) => - // fetch next page - val futRs = rs.fetchMoreResults().asScala() - futRs.onComplete(futFetchedCallback.invoke) - case None => () // doing nothing, waiting for futRs in preStart() to be completed + maybeRs match { + case Some(rs) if rs.getAvailableWithoutFetching > 0 => push(out, rs.one()) + case Some(rs) if rs.isExhausted => completeStage() + case Some(rs) => + // fetch next page + val futRs = rs.fetchMoreResults().asScala() + futRs.onComplete(futFetchedCallback.invoke) + case None => () // doing nothing, waiting for futRs in preStart() to be completed + } } } - }) + ) private def tryPushAfterFetch(rsOrFailure: Try[ResultSet]): Unit = rsOrFailure match { case Success(rs) => diff --git a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala index 5326546026..4204173dc9 100644 --- a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala +++ b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala @@ -12,11 +12,11 @@ import scala.concurrent.{ExecutionContext, Future} import akka.stream.alpakka.cassandra.GuavaFutures._ object CassandraSink { - def apply[T](parallelism: Int, - statement: PreparedStatement, - statementBinder: (T, PreparedStatement) => BoundStatement)( - implicit session: Session, - ex: ExecutionContext): Sink[T, Future[Done]] = + def apply[T]( + parallelism: Int, + statement: PreparedStatement, + statementBinder: (T, PreparedStatement) => BoundStatement + )(implicit session: Session, ex: ExecutionContext): Sink[T, Future[Done]] = Flow[T] .mapAsyncUnordered(parallelism)(t ⇒ session.executeAsync(statementBinder(t, statement)).asScala()) .toMat(Sink.ignore)(Keep.right) diff --git a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/AwsClient.scala b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/AwsClient.scala index d84a02db37..c16e137742 100644 --- a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/AwsClient.scala +++ b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/AwsClient.scala @@ -88,13 +88,16 @@ private[alpakka] trait AwsClient[S <: ClientSettings] { val amzHeaders = original.getHeaders val body = read(original.getContent) - val httpr = HttpRequest(uri = signableUrl, method = original.getHttpMethod, + val httpr = HttpRequest( + uri = signableUrl, + method = original.getHttpMethod, headers = List( headers.RawHeader("x-amz-date", amzHeaders.get("X-Amz-Date")), headers.RawHeader("authorization", amzHeaders.get("Authorization")), headers.RawHeader("x-amz-target", amzHeaders.get("X-Amz-Target")) ), - entity = HttpEntity(defaultContentType, body)) + entity = HttpEntity(defaultContentType, body) + ) httpr -> AwsRequestMetadata(requestId.getAndIncrement(), s) } diff --git a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/DynamoClientImpl.scala b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/DynamoClientImpl.scala index c3e90e9253..aba7b00939 100644 --- a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/DynamoClientImpl.scala +++ b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/DynamoClientImpl.scala @@ -14,10 +14,10 @@ import akka.stream.scaladsl.{Sink, Source} import com.amazonaws.AmazonServiceException import com.amazonaws.http.HttpResponseHandler -class DynamoClientImpl(val settings: DynamoSettings, - val errorResponseHandler: HttpResponseHandler[AmazonServiceException])( - implicit protected val system: ActorSystem, - implicit protected val materializer: ActorMaterializer) +class DynamoClientImpl( + val settings: DynamoSettings, + val errorResponseHandler: HttpResponseHandler[AmazonServiceException] +)(implicit protected val system: ActorSystem, implicit protected val materializer: ActorMaterializer) extends AwsClient[DynamoSettings] { override protected val service = "dynamodb" diff --git a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/DynamoProtocol.scala b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/DynamoProtocol.scala index ac1d2d4e71..1d9c881e23 100644 --- a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/DynamoProtocol.scala +++ b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/DynamoProtocol.scala @@ -14,30 +14,32 @@ private[alpakka] trait DynamoProtocol { val meta = new JsonOperationMetadata().withPayloadJson(true) val protocol: SdkJsonProtocolFactory = new SdkJsonProtocolFactory( - new JsonClientMetadata() - .addAllErrorMetadata( - new JsonErrorShapeMetadata() - .withErrorCode("ItemCollectionSizeLimitExceededException") - .withModeledClass(classOf[ItemCollectionSizeLimitExceededException]), - new JsonErrorShapeMetadata() - .withErrorCode("ResourceInUseException") - .withModeledClass(classOf[ResourceInUseException]), - new JsonErrorShapeMetadata() - .withErrorCode("ResourceNotFoundException") - .withModeledClass(classOf[ResourceNotFoundException]), - new JsonErrorShapeMetadata() - .withErrorCode("ProvisionedThroughputExceededException") - .withModeledClass(classOf[ProvisionedThroughputExceededException]), - new JsonErrorShapeMetadata() - .withErrorCode("ConditionalCheckFailedException") - .withModeledClass(classOf[ConditionalCheckFailedException]), - new JsonErrorShapeMetadata() - .withErrorCode("InternalServerError") - .withModeledClass(classOf[InternalServerErrorException]), - new JsonErrorShapeMetadata() - .withErrorCode("LimitExceededException") - .withModeledClass(classOf[LimitExceededException])) - .withBaseServiceExceptionClass(classOf[com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException])) + new JsonClientMetadata() + .addAllErrorMetadata( + new JsonErrorShapeMetadata() + .withErrorCode("ItemCollectionSizeLimitExceededException") + .withModeledClass(classOf[ItemCollectionSizeLimitExceededException]), + new JsonErrorShapeMetadata() + .withErrorCode("ResourceInUseException") + .withModeledClass(classOf[ResourceInUseException]), + new JsonErrorShapeMetadata() + .withErrorCode("ResourceNotFoundException") + .withModeledClass(classOf[ResourceNotFoundException]), + new JsonErrorShapeMetadata() + .withErrorCode("ProvisionedThroughputExceededException") + .withModeledClass(classOf[ProvisionedThroughputExceededException]), + new JsonErrorShapeMetadata() + .withErrorCode("ConditionalCheckFailedException") + .withModeledClass(classOf[ConditionalCheckFailedException]), + new JsonErrorShapeMetadata() + .withErrorCode("InternalServerError") + .withModeledClass(classOf[InternalServerErrorException]), + new JsonErrorShapeMetadata() + .withErrorCode("LimitExceededException") + .withModeledClass(classOf[LimitExceededException]) + ) + .withBaseServiceExceptionClass(classOf[com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException]) + ) val errorResponseHandler: HttpResponseHandler[AmazonServiceException] = protocol.createErrorResponseHandler(new JsonErrorResponseMetadata()) diff --git a/dynamodb/src/test/scala/akka/stream/alpakka/dynamodb/TestOps.scala b/dynamodb/src/test/scala/akka/stream/alpakka/dynamodb/TestOps.scala index 6751f334ca..15aa74b0f0 100644 --- a/dynamodb/src/test/scala/akka/stream/alpakka/dynamodb/TestOps.scala +++ b/dynamodb/src/test/scala/akka/stream/alpakka/dynamodb/TestOps.scala @@ -66,11 +66,14 @@ object ItemSpecOps extends TestOps { new PutItemRequest().withTableName(tableName).withItem((keyMap("A", 1) + ("data" -> S(test5Data))).asJava) val batchWriteItemRequest = new BatchWriteItemRequest().withRequestItems( - Map( - tableName -> - List(new WriteRequest(new PutRequest().withItem((keyMap("B", 0) + ("data" -> S(test5Data))).asJava)), - new WriteRequest(new PutRequest().withItem((keyMap("B", - 1) + ("data" -> S(test5Data))).asJava))).asJava).asJava) + Map( + tableName -> + List( + new WriteRequest(new PutRequest().withItem((keyMap("B", 0) + ("data" -> S(test5Data))).asJava)), + new WriteRequest(new PutRequest().withItem((keyMap("B", 1) + ("data" -> S(test5Data))).asJava)) + ).asJava + ).asJava + ) val deleteItemRequest = new DeleteItemRequest().withTableName(tableName).withKey(keyMap("A", 0).asJava) @@ -102,7 +105,8 @@ object TableSpecOps extends TestOps { val updateTableRequest = new UpdateTableRequest() .withTableName(tableName) .withProvisionedThroughput( - new ProvisionedThroughput().withWriteCapacityUnits(newMaxLimit).withReadCapacityUnits(newMaxLimit)) + new ProvisionedThroughput().withWriteCapacityUnits(newMaxLimit).withReadCapacityUnits(newMaxLimit) + ) val deleteTableRequest = common.deleteTableRequest diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpBrowserGraphStage.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpBrowserGraphStage.scala index 93dc1028ec..75b49f9e34 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpBrowserGraphStage.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpBrowserGraphStage.scala @@ -32,27 +32,29 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: RemoteFileSettings] exte private[this] var buffer: Seq[FtpFile] = Seq.empty[FtpFile] - setHandler(out, + setHandler( + out, new OutHandler { - def onPull(): Unit = { - fillBuffer() - buffer match { - case head +: tail => - buffer = tail - push(out, head) - case _ => finalize() - } - def finalize() = try { disconnect() } finally { complete(out) } - } // end of onPull - - override def onDownstreamFinish(): Unit = - try { - disconnect() - } finally { - matSuccess() - super.onDownstreamFinish() - } - }) // end of handler + def onPull(): Unit = { + fillBuffer() + buffer match { + case head +: tail => + buffer = tail + push(out, head) + case _ => finalize() + } + def finalize() = try { disconnect() } finally { complete(out) } + } // end of onPull + + override def onDownstreamFinish(): Unit = + try { + disconnect() + } finally { + matSuccess() + super.onDownstreamFinish() + } + } + ) // end of handler protected[this] def doPreStart(): Unit = buffer = initBuffer(basePath) diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala index caa6728dc5..c08cc974ff 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala @@ -48,31 +48,33 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: RemoteFileSettings] private[this] var isOpt: Option[InputStream] = None private[this] var readBytesTotal: Long = 0L - setHandler(out, + setHandler( + out, new OutHandler { - def onPull(): Unit = - readChunk() match { - case Some(bs) => - push(out, bs) - case None => - try { - isOpt.foreach(_.close()) - disconnect() - } finally { - matSuccess() - complete(out) - } - } - - override def onDownstreamFinish(): Unit = - try { - isOpt.foreach(_.close()) - disconnect() - } finally { - matSuccess() - super.onDownstreamFinish() - } - }) // end of handler + def onPull(): Unit = + readChunk() match { + case Some(bs) => + push(out, bs) + case None => + try { + isOpt.foreach(_.close()) + disconnect() + } finally { + matSuccess() + complete(out) + } + } + + override def onDownstreamFinish(): Unit = + try { + isOpt.foreach(_.close()) + disconnect() + } finally { + matSuccess() + super.onDownstreamFinish() + } + } + ) // end of handler override def postStop(): Unit = try { @@ -129,22 +131,24 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings] private[this] var osOpt: Option[OutputStream] = None private[this] var writtenBytesTotal: Long = 0L - setHandler(in, + setHandler( + in, new InHandler { - override def onPush(): Unit = { - write(grab(in)) - pull(in) - } - - override def onUpstreamFinish(): Unit = - try { - osOpt.foreach(_.close()) - disconnect() - } finally { - matSuccess() - super.onUpstreamFinish() + override def onPush(): Unit = { + write(grab(in)) + pull(in) } - }) // end of handler + + override def onUpstreamFinish(): Unit = + try { + osOpt.foreach(_.close()) + disconnect() + } finally { + matSuccess() + super.onUpstreamFinish() + } + } + ) // end of handler override def postStop(): Unit = try { diff --git a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSource.scala b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSource.scala index 0369ae2efd..8b24d0ebd1 100644 --- a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSource.scala +++ b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSource.scala @@ -59,26 +59,28 @@ private final class GooglePubSubSource(projectId: String, } } - setHandler(out, + setHandler( + out, new OutHandler { - override def onPull(): Unit = - state match { - case Pending => - state = Fetching - fetch(materializer) - case Fetching => - // do nothing we will push on request result - case HoldingMessages(xs) => - xs match { - case head :: tail => - state = HoldingMessages(tail) - push(out, head) - case Nil => - state = Fetching - fetch(materializer) - } - } - }) + override def onPull(): Unit = + state match { + case Pending => + state = Fetching + fetch(materializer) + case Fetching => + // do nothing we will push on request result + case HoldingMessages(xs) => + xs match { + case head :: tail => + state = HoldingMessages(tail) + push(out, head) + case Nil => + state = Fetching + fetch(materializer) + } + } + } + ) } } diff --git a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/HttpApi.scala b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/HttpApi.scala index d5214e2c2a..48a0e56b72 100644 --- a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/HttpApi.scala +++ b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/HttpApi.scala @@ -49,7 +49,8 @@ private trait HttpApi { def pull(project: String, subscription: String, accessToken: String, apiKey: String)( implicit as: ActorSystem, - materializer: Materializer): Future[PullResponse] = { + materializer: Materializer + ): Future[PullResponse] = { import materializer.executionContext val uri: Uri = s"$PubSubGoogleApisHost/v1/projects/$project/subscriptions/$subscription:pull?key=$apiKey" @@ -88,7 +89,8 @@ private trait HttpApi { def publish(project: String, topic: String, accessToken: String, apiKey: String, request: PublishRequest)( implicit as: ActorSystem, - materializer: Materializer): Future[immutable.Seq[String]] = { + materializer: Materializer + ): Future[immutable.Seq[String]] = { import materializer.executionContext val url: Uri = s"$PubSubGoogleApisHost/v1/projects/$project/topics/$topic:publish?key=$apiKey" @@ -102,7 +104,8 @@ private trait HttpApi { def getAccessToken(clientEmail: String, privateKey: PrivateKey, when: Instant)( implicit as: ActorSystem, - materializer: Materializer): Future[AccessTokenExpiry] = { + materializer: Materializer + ): Future[AccessTokenExpiry] = { import materializer.executionContext val expiresAt = when.getEpochSecond + 3600 val request = buildAuthRequest(clientEmail, when.getEpochSecond, expiresAt, privateKey) diff --git a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/javadsl/GooglePubSub.scala b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/javadsl/GooglePubSub.scala index 7e6a3a694d..19159adc79 100644 --- a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/javadsl/GooglePubSub.scala +++ b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/javadsl/GooglePubSub.scala @@ -27,8 +27,12 @@ object GooglePubSub { actorSystem: ActorSystem, materializer: Materializer): Flow[PublishRequest, java.util.List[String], NotUsed] = GPubSub - .publish(projectId = projectId, apiKey = apiKey, clientEmail = clientEmail, privateKey = privateKey, - topic = topic, parallelism = parallelism)(actorSystem, materializer) + .publish(projectId = projectId, + apiKey = apiKey, + clientEmail = clientEmail, + privateKey = privateKey, + topic = topic, + parallelism = parallelism)(actorSystem, materializer) .map(_.asJava) .asJava @@ -40,8 +44,11 @@ object GooglePubSub { actorSystem: ActorSystem, materializer: Materializer): Source[ReceivedMessage, NotUsed] = GPubSub - .subscribe(projectId = projectId, apiKey = apiKey, clientEmail = clientEmail, privateKey = privateKey, - subscription = subscription)(actorSystem, materializer) + .subscribe(projectId = projectId, + apiKey = apiKey, + clientEmail = clientEmail, + privateKey = privateKey, + subscription = subscription)(actorSystem, materializer) .asJava def acknowledge(projectId: String, @@ -53,8 +60,12 @@ object GooglePubSub { actorSystem: ActorSystem, materializer: Materializer): Sink[AcknowledgeRequest, CompletionStage[Done]] = GPubSub - .acknowledge(projectId = projectId, apiKey = apiKey, clientEmail = clientEmail, privateKey = privateKey, - subscription = subscription, parallelism = parallelism)(actorSystem, materializer) + .acknowledge(projectId = projectId, + apiKey = apiKey, + clientEmail = clientEmail, + privateKey = privateKey, + subscription = subscription, + parallelism = parallelism)(actorSystem, materializer) .mapMaterializedValue(_.toJava) .asJava } diff --git a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/scaladsl/GooglePubSub.scala b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/scaladsl/GooglePubSub.scala index 7656a985da..38f9950446 100644 --- a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/scaladsl/GooglePubSub.scala +++ b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/scaladsl/GooglePubSub.scala @@ -37,7 +37,8 @@ protected[pubsub] trait GooglePubSub { topic: String, parallelism: Int = 1)( implicit actorSystem: ActorSystem, - materializer: Materializer): Flow[PublishRequest, immutable.Seq[String], NotUsed] = { + materializer: Materializer + ): Flow[PublishRequest, immutable.Seq[String], NotUsed] = { import materializer.executionContext val session = getSession(clientEmail, privateKey) @@ -50,27 +51,37 @@ protected[pubsub] trait GooglePubSub { def subscribe(projectId: String, apiKey: String, clientEmail: String, privateKey: PrivateKey, subscription: String)( implicit actorSystem: ActorSystem, - materializer: Materializer): Source[ReceivedMessage, NotUsed] = { + materializer: Materializer + ): Source[ReceivedMessage, NotUsed] = { val session = getSession(clientEmail, privateKey) - Source.fromGraph(new GooglePubSubSource(projectId = projectId, apiKey = apiKey, session = session, - subscription = subscription, httpApi = httpApi)) + Source.fromGraph( + new GooglePubSubSource(projectId = projectId, + apiKey = apiKey, + session = session, + subscription = subscription, + httpApi = httpApi) + ) } - def acknowledge(projectId: String, - apiKey: String, - clientEmail: String, - privateKey: PrivateKey, - subscription: String, - parallelism: Int = 1)(implicit actorSystem: ActorSystem, - materializer: Materializer): Sink[AcknowledgeRequest, Future[Done]] = { + def acknowledge( + projectId: String, + apiKey: String, + clientEmail: String, + privateKey: PrivateKey, + subscription: String, + parallelism: Int = 1 + )(implicit actorSystem: ActorSystem, materializer: Materializer): Sink[AcknowledgeRequest, Future[Done]] = { import materializer.executionContext val session = getSession(clientEmail, privateKey) Flow[AcknowledgeRequest] .mapAsyncUnordered(parallelism) { ackReq => session.getToken().flatMap { accessToken => - httpApi.acknowledge(project = projectId, subscription = subscription, accessToken = accessToken, - apiKey = apiKey, request = ackReq) + httpApi.acknowledge(project = projectId, + subscription = subscription, + accessToken = accessToken, + apiKey = apiKey, + request = ackReq) } } .toMat(Sink.ignore)(Keep.right) diff --git a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/ExampleUsage.scala b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/ExampleUsage.scala index 5d24cc4e97..8561f04697 100644 --- a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/ExampleUsage.scala +++ b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/ExampleUsage.scala @@ -27,15 +27,15 @@ class ExampleUsage { //#init-credentials val privateKey: PrivateKey = { val pk = "MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCxwdLoCIviW0BsREeKzi" + - "qiSgzl17Q6nD4RhqbB71oPGG8h82EJPeIlLQsMGEtuig0MVsUa9MudewFuQ/XHWtxnueQ3I900EJm" + - "rDTA4ysgHcVvyDBPuYdVVV7LE/9nysuHb2x3bh057Sy60qZqDS2hV9ybOBp2RIEK04k/hQDDqp+Lx" + - "cnNQBi5C0f6aohTN6Ced2vvTY6hWbgFDk4Hdw9JDJpf8TSx/ZxJxPd3EA58SgXRBuamVZWy1IVpFO" + - "SKUCr4wwMOrELu9mRGzmNJiLSqn1jqJlG97ogth3dEldSOtwlfVI1M4sDe3k1SnF1+IagfK7Wda5h" + - "PbMdbh2my3EMGY159ktbtTAUzJejPQfhVzk84XNxVPdjN01xN2iceXSKcJHzy8iy9JHb+t9qIIcYk" + - "ZPJrBCyphUGlMWE+MFwtjbHMBxhqJNyG0TYByWudF+/QRFaz0FsMr4TmksNmoLPBZTo8zAoGBAKZI" + - "vf5XBlTqd/tR4cnTBQOeeegTHT5x7e+W0mfpCo/gDDmKnOsF2lAwj/F/hM5WqorHoM0ibno+0zUb5" + - "q6rhccAm511h0LmV1taVkbWk4UReuPuN+UyVUP+IjmXjagDle9IkOE7+fDlNb+Q7BHl2R8zm1jZjE" + - "DwM2NQnSxQ22+/" + "qiSgzl17Q6nD4RhqbB71oPGG8h82EJPeIlLQsMGEtuig0MVsUa9MudewFuQ/XHWtxnueQ3I900EJm" + + "rDTA4ysgHcVvyDBPuYdVVV7LE/9nysuHb2x3bh057Sy60qZqDS2hV9ybOBp2RIEK04k/hQDDqp+Lx" + + "cnNQBi5C0f6aohTN6Ced2vvTY6hWbgFDk4Hdw9JDJpf8TSx/ZxJxPd3EA58SgXRBuamVZWy1IVpFO" + + "SKUCr4wwMOrELu9mRGzmNJiLSqn1jqJlG97ogth3dEldSOtwlfVI1M4sDe3k1SnF1+IagfK7Wda5h" + + "PbMdbh2my3EMGY159ktbtTAUzJejPQfhVzk84XNxVPdjN01xN2iceXSKcJHzy8iy9JHb+t9qIIcYk" + + "ZPJrBCyphUGlMWE+MFwtjbHMBxhqJNyG0TYByWudF+/QRFaz0FsMr4TmksNmoLPBZTo8zAoGBAKZI" + + "vf5XBlTqd/tR4cnTBQOeeegTHT5x7e+W0mfpCo/gDDmKnOsF2lAwj/F/hM5WqorHoM0ibno+0zUb5" + + "q6rhccAm511h0LmV1taVkbWk4UReuPuN+UyVUP+IjmXjagDle9IkOE7+fDlNb+Q7BHl2R8zm1jZjE" + + "DwM2NQnSxQ22+/" val kf = KeyFactory.getInstance("RSA") val encodedPv = Base64.getDecoder.decode(pk) val keySpecPv = new PKCS8EncodedKeySpec(encodedPv) @@ -73,11 +73,15 @@ class ExampleUsage { val ackSink: Sink[AcknowledgeRequest, Future[Done]] = GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription) - subscriptionSource.map { message => - // do something fun + subscriptionSource + .map { message => + // do something fun - message.ackId - }.groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink) + message.ackId + } + .groupedWithin(1000, 1.minute) + .map(AcknowledgeRequest.apply) + .to(ackSink) //#subscribe //#subscribe-auto-ack diff --git a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala index 97ac69c565..1111d6dddc 100644 --- a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala @@ -40,16 +40,26 @@ class GooglePubSubSpec extends FlatSpec with MockitoSugar with ScalaFutures with } it should "auth and publish the message" in new Fixtures { - val flow = googlePubSub.publish(projectId = TestCredentials.projectId, apiKey = TestCredentials.apiKey, - clientEmail = TestCredentials.clientEmail, privateKey = TestCredentials.privateKey, topic = "topic1") + val flow = googlePubSub.publish( + projectId = TestCredentials.projectId, + apiKey = TestCredentials.apiKey, + clientEmail = TestCredentials.clientEmail, + privateKey = TestCredentials.privateKey, + topic = "topic1" + ) val request = PublishRequest(Seq(PubSubMessage(messageId = "1", data = base64String("Hello Google!")))) val source = Source(List(request)) when(auth.getToken()).thenReturn(Future.successful("ok")) - when(mockHttpApi.publish(project = TestCredentials.projectId, topic = "topic1", accessToken = "ok", - apiKey = TestCredentials.apiKey, request = request)).thenReturn(Future.successful(Seq("id1"))) + when( + mockHttpApi.publish(project = TestCredentials.projectId, + topic = "topic1", + accessToken = "ok", + apiKey = TestCredentials.apiKey, + request = request) + ).thenReturn(Future.successful(Seq("id1"))) val result = source.via(flow).runWith(Sink.seq) @@ -61,11 +71,20 @@ class GooglePubSubSpec extends FlatSpec with MockitoSugar with ScalaFutures with ReceivedMessage(ackId = "1", message = PubSubMessage(messageId = "1", data = base64String("Hello Google!"))) when(auth.getToken()).thenReturn(Future.successful("ok")) - when(mockHttpApi.pull(project = TestCredentials.projectId, subscription = "sub1", apiKey = TestCredentials.apiKey, - accessToken = "ok")).thenReturn(Future.successful(PullResponse(receivedMessages = Some(Seq(message))))) - - val source = googlePubSub.subscribe(projectId = TestCredentials.projectId, apiKey = TestCredentials.apiKey, - clientEmail = TestCredentials.clientEmail, privateKey = TestCredentials.privateKey, subscription = "sub1") + when( + mockHttpApi.pull(project = TestCredentials.projectId, + subscription = "sub1", + apiKey = TestCredentials.apiKey, + accessToken = "ok") + ).thenReturn(Future.successful(PullResponse(receivedMessages = Some(Seq(message))))) + + val source = googlePubSub.subscribe( + projectId = TestCredentials.projectId, + apiKey = TestCredentials.apiKey, + clientEmail = TestCredentials.clientEmail, + privateKey = TestCredentials.privateKey, + subscription = "sub1" + ) val result = source.take(1).runWith(Sink.seq) @@ -74,12 +93,21 @@ class GooglePubSubSpec extends FlatSpec with MockitoSugar with ScalaFutures with it should "auth and acknowledge a message" in new Fixtures { when(auth.getToken()).thenReturn(Future.successful("ok")) - when(mockHttpApi.acknowledge(project = TestCredentials.projectId, subscription = "sub1", - apiKey = TestCredentials.apiKey, accessToken = "ok", request = AcknowledgeRequest(ackIds = Seq("a1")))) - .thenReturn(Future.successful(())) - - val sink = googlePubSub.acknowledge(projectId = TestCredentials.projectId, apiKey = TestCredentials.apiKey, - clientEmail = TestCredentials.clientEmail, privateKey = TestCredentials.privateKey, subscription = "sub1") + when( + mockHttpApi.acknowledge(project = TestCredentials.projectId, + subscription = "sub1", + apiKey = TestCredentials.apiKey, + accessToken = "ok", + request = AcknowledgeRequest(ackIds = Seq("a1"))) + ).thenReturn(Future.successful(())) + + val sink = googlePubSub.acknowledge( + projectId = TestCredentials.projectId, + apiKey = TestCredentials.apiKey, + clientEmail = TestCredentials.clientEmail, + privateKey = TestCredentials.privateKey, + subscription = "sub1" + ) val source = Source(List(AcknowledgeRequest(List("a1")))) diff --git a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/HttpApiSpec.scala b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/HttpApiSpec.scala index a9194ed323..2e95f58010 100644 --- a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/HttpApiSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/HttpApiSpec.scala @@ -56,9 +56,10 @@ class HttpApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures with |}""".stripMargin mock.register( - any(urlEqualTo("/oauth2/v4/token")) - .withRequestBody(WireMock.equalTo(expectedRequest)) - .willReturn(aResponse().withStatus(200).withBody(authResult).withHeader("Content-Type", "application/json"))) + any(urlEqualTo("/oauth2/v4/token")) + .withRequestBody(WireMock.equalTo(expectedRequest)) + .willReturn(aResponse().withStatus(200).withBody(authResult).withHeader("Content-Type", "application/json")) + ) val result = TestHttpApi.getAccessToken(TestCredentials.clientEmail, TestCredentials.privateKey, Instant.ofEpochSecond(0)) @@ -77,15 +78,19 @@ class HttpApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures with val publishResponse = """{"messageIds":["1"]}""" mock.register( - WireMock - .post(urlEqualTo( - s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?key=${TestCredentials.apiKey}")) - .withRequestBody(WireMock.equalTo(expectedPublishRequest)) - .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken)) - .willReturn(aResponse() - .withStatus(200) - .withBody(publishResponse) - .withHeader("Content-Type", "application/json"))) + WireMock + .post( + urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?key=${TestCredentials.apiKey}") + ) + .withRequestBody(WireMock.equalTo(expectedPublishRequest)) + .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken)) + .willReturn( + aResponse() + .withStatus(200) + .withBody(publishResponse) + .withHeader("Content-Type", "application/json") + ) + ) val result = TestHttpApi.publish(TestCredentials.projectId, "topic1", accessToken, TestCredentials.apiKey, publishRequest) @@ -103,12 +108,17 @@ class HttpApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures with val pullRequest = """{"returnImmediately":true,"maxMessages":1000}""" - mock.register(WireMock - .post(urlEqualTo( - s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?key=${TestCredentials.apiKey}")) + mock.register( + WireMock + .post( + urlEqualTo( + s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?key=${TestCredentials.apiKey}" + ) + ) .withRequestBody(WireMock.equalTo(pullRequest)) .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken)) - .willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type", "application/json"))) + .willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type", "application/json")) + ) val result = TestHttpApi.pull(TestCredentials.projectId, "sub1", accessToken, TestCredentials.apiKey) result.futureValue shouldBe PullResponse(Some(Seq(ReceivedMessage("ack1", publishMessage)))) @@ -121,12 +131,17 @@ class HttpApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures with val pullRequest = """{"returnImmediately":true,"maxMessages":1000}""" - mock.register(WireMock - .post(urlEqualTo( - s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?key=${TestCredentials.apiKey}")) + mock.register( + WireMock + .post( + urlEqualTo( + s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:pull?key=${TestCredentials.apiKey}" + ) + ) .withRequestBody(WireMock.equalTo(pullRequest)) .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken)) - .willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type", "application/json"))) + .willReturn(aResponse().withStatus(200).withBody(pullResponse).withHeader("Content-Type", "application/json")) + ) val result = TestHttpApi.pull(TestCredentials.projectId, "sub1", accessToken, TestCredentials.apiKey) result.futureValue shouldBe PullResponse(None) @@ -135,17 +150,25 @@ class HttpApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures with it should "acknowledge" in { val ackRequest = """{"ackIds":["ack1"]}""" - mock.register(WireMock - .post(urlEqualTo( - s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:acknowledge?key=${TestCredentials.apiKey}")) + mock.register( + WireMock + .post( + urlEqualTo( + s"/v1/projects/${TestCredentials.projectId}/subscriptions/sub1:acknowledge?key=${TestCredentials.apiKey}" + ) + ) .withRequestBody(WireMock.equalTo(ackRequest)) .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken)) - .willReturn(aResponse().withStatus(200))) + .willReturn(aResponse().withStatus(200)) + ) val acknowledgeRequest = AcknowledgeRequest(Seq("ack1")) - val result = TestHttpApi.acknowledge(TestCredentials.projectId, "sub1", accessToken, TestCredentials.apiKey, - acknowledgeRequest) + val result = TestHttpApi.acknowledge(TestCredentials.projectId, + "sub1", + accessToken, + TestCredentials.apiKey, + acknowledgeRequest) result.futureValue shouldBe (()) } diff --git a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/SessionSpec.scala b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/SessionSpec.scala index a4cd934c79..3d1d839659 100644 --- a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/SessionSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/SessionSpec.scala @@ -38,8 +38,8 @@ class SessionSpec extends FlatSpec with ScalaFutures with MockitoSugar with Befo session.getToken().futureValue shouldBe accessToken - verify(mockHttpApi, times(1)).getAccessToken(TestCredentials.clientEmail, TestCredentials.privateKey, - Instant.ofEpochSecond(0)) + verify(mockHttpApi, times(1)) + .getAccessToken(TestCredentials.clientEmail, TestCredentials.privateKey, Instant.ofEpochSecond(0)) } it should "uses the cached value for the second request" in new Fixtures { @@ -51,8 +51,8 @@ class SessionSpec extends FlatSpec with ScalaFutures with MockitoSugar with Befo session.getToken().futureValue shouldBe accessToken - verify(mockHttpApi, times(0)).getAccessToken(TestCredentials.clientEmail, TestCredentials.privateKey, - Instant.ofEpochSecond(0)) + verify(mockHttpApi, times(0)) + .getAccessToken(TestCredentials.clientEmail, TestCredentials.privateKey, Instant.ofEpochSecond(0)) } it should "requests a new session if current is about to expire" in new Fixtures { @@ -62,13 +62,14 @@ class SessionSpec extends FlatSpec with ScalaFutures with MockitoSugar with Befo this.maybeAccessToken = Some(Future.successful(AccessTokenExpiry("t1", 3600))) } - when(mockHttpApi.getAccessToken(TestCredentials.clientEmail, TestCredentials.privateKey, - Instant.ofEpochSecond(3599))).thenReturn(Future.successful(AccessTokenExpiry(accessToken, 3600))) + when( + mockHttpApi.getAccessToken(TestCredentials.clientEmail, TestCredentials.privateKey, Instant.ofEpochSecond(3599)) + ).thenReturn(Future.successful(AccessTokenExpiry(accessToken, 3600))) session.getToken().futureValue shouldBe accessToken - verify(mockHttpApi, times(1)).getAccessToken(TestCredentials.clientEmail, TestCredentials.privateKey, - Instant.ofEpochSecond(3599)) + verify(mockHttpApi, times(1)) + .getAccessToken(TestCredentials.clientEmail, TestCredentials.privateKey, Instant.ofEpochSecond(3599)) } override def afterAll(): Unit = diff --git a/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseCapabilities.scala b/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseCapabilities.scala index 9ecb0bbb9d..da7c9b9227 100644 --- a/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseCapabilities.scala +++ b/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseCapabilities.scala @@ -46,7 +46,8 @@ private[internal] trait HBaseCapabilities { this: StageLogging => Await.result(Future(ConnectionFactory.createConnection(conf)), timeout seconds) private[internal] def getOrCreateTable(tableName: TableName, columnFamilies: Seq[String])( - implicit connection: Connection): Try[Table] = twr(connection.getAdmin) { admin => + implicit connection: Connection + ): Try[Table] = twr(connection.getAdmin) { admin => val table = if (admin.isTableAvailable(tableName)) connection.getTable(tableName) diff --git a/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseFlowStage.scala b/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseFlowStage.scala index 25b6b7645a..150408b30a 100644 --- a/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseFlowStage.scala +++ b/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseFlowStage.scala @@ -29,14 +29,12 @@ private[hbase] class HBaseFlowStage[A](settings: HTableSettings[A]) extends Grap lazy val table: Table = getOrCreateTable(settings.tableName, settings.columnFamilies).get - setHandler(out, - new OutHandler { + setHandler(out, new OutHandler { override def onPull() = pull(in) }) - setHandler(in, - new InHandler { + setHandler(in, new InHandler { override def onPush() = { val msg = grab(in) diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/JmsSinkStage.scala b/jms/src/main/scala/akka/stream/alpakka/jms/JmsSinkStage.scala index 956b2d9378..47db9566fd 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/JmsSinkStage.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/JmsSinkStage.scala @@ -30,15 +30,17 @@ final class JmsSinkStage(settings: JmsSettings) extends GraphStage[SinkShape[Str pull(in) } - setHandler(in, + setHandler( + in, new InHandler { - override def onPush(): Unit = { - val elem = grab(in) - val textMessage: TextMessage = jmsSession.session.createTextMessage(elem) - jmsProducer.send(textMessage) - pull(in) + override def onPush(): Unit = { + val elem = grab(in) + val textMessage: TextMessage = jmsSession.session.createTextMessage(elem) + jmsProducer.send(textMessage) + pull(in) + } } - }) + ) override def postStop(): Unit = Option(jmsSession).foreach(_.closeSession()) diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/JmsSourceStage.scala b/jms/src/main/scala/akka/stream/alpakka/jms/JmsSourceStage.scala index e2d92d3395..634d62a186 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/JmsSourceStage.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/JmsSourceStage.scala @@ -25,7 +25,8 @@ final class JmsSourceStage(settings: JmsSourceSettings) extends GraphStage[Sourc private[jms] def getDispatcher = inheritedAttributes.get[ActorAttributes.Dispatcher]( - ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher")) match { + ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") + ) match { case ActorAttributes.Dispatcher("") => ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") case d => d @@ -76,8 +77,7 @@ final class JmsSourceStage(settings: JmsSourceSettings) extends GraphStage[Sourc fail.invoke(e) } - setHandler(out, - new OutHandler { + setHandler(out, new OutHandler { override def onPull(): Unit = if (queue.nonEmpty) { pushMessage(queue.dequeue()) diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttProducerStage.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttProducerStage.scala index 8fcaa6caad..a96b21569c 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttProducerStage.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttProducerStage.scala @@ -30,22 +30,23 @@ final class MqttProducerStage(cs: MqttConnectionSettings, qos: MqttQoS) failStage(ex) } - setHandler(in, + setHandler( + in, new InHandler { - override def onPush() = { - val msg = grab(in) - val pahoMsg = new PahoMqttMessage(msg.payload.toArray) - pahoMsg.setQos(qos.byteValue) - mqttClient match { - case Some(client) => client.publish(msg.topic, pahoMsg, msg, onPublished.invoke _) - case None => failStage(MqttProducerStage.NoClientException) - } + override def onPush() = { + val msg = grab(in) + val pahoMsg = new PahoMqttMessage(msg.payload.toArray) + pahoMsg.setQos(qos.byteValue) + mqttClient match { + case Some(client) => client.publish(msg.topic, pahoMsg, msg, onPublished.invoke _) + case None => failStage(MqttProducerStage.NoClientException) + } + } } - }) + ) - setHandler(out, - new OutHandler { + setHandler(out, new OutHandler { override def onPull() = if (mqttClient.isDefined) pull(in) }) diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttSourceStage.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttSourceStage.scala index 92bb2a9776..6e8a8a5991 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttSourceStage.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttSourceStage.scala @@ -47,8 +47,7 @@ final class MqttSourceStage(settings: MqttSourceSettings, bufferSize: Int) override val connectionSettings = settings.connectionSettings - setHandler(out, - new OutHandler { + setHandler(out, new OutHandler { override def onPull(): Unit = if (queue.nonEmpty) { pushMessage(queue.dequeue()) diff --git a/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSinkSpec.scala b/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSinkSpec.scala index ae297da7dd..26a7766994 100644 --- a/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSinkSpec.scala +++ b/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSinkSpec.scala @@ -45,8 +45,9 @@ class MqttSinkSpec val msg = MqttMessage(topic, ByteString("ohi")) val (subscribed, message) = - MqttSource(MqttSourceSettings(sourceSettings, Map(topic -> MqttQoS.atLeastOnce)), - 8).toMat(Sink.head)(Keep.both).run() + MqttSource(MqttSourceSettings(sourceSettings, Map(topic -> MqttQoS.atLeastOnce)), 8) + .toMat(Sink.head)(Keep.both) + .run() whenReady(subscribed) { _ => Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.atLeastOnce)) @@ -60,8 +61,10 @@ class MqttSinkSpec val numOfMessages = 42 val (subscribed, messages) = - MqttSource(MqttSourceSettings(sourceSettings, Map(topic -> MqttQoS.atLeastOnce)), - 8).grouped(numOfMessages).toMat(Sink.head)(Keep.both).run() + MqttSource(MqttSourceSettings(sourceSettings, Map(topic -> MqttQoS.atLeastOnce)), 8) + .grouped(numOfMessages) + .toMat(Sink.head)(Keep.both) + .run() whenReady(subscribed) { _ => Source(1 to numOfMessages).map(_ => msg).runWith(MqttSink(sinkSettings, MqttQoS.atLeastOnce)) diff --git a/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSourceSpec.scala b/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSourceSpec.scala index 031fff262c..f70783ac88 100644 --- a/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSourceSpec.scala +++ b/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSourceSpec.scala @@ -99,7 +99,7 @@ class MqttSourceSpec "fail connection when not providing the requested credentials" in { val settings = MqttSourceSettings(sourceSettings.withAuth("username1", "bad_password"), - Map(secureTopic -> MqttQoS.AtLeastOnce)) + Map(secureTopic -> MqttQoS.AtLeastOnce)) val first = MqttSource(settings, 8).runWith(Sink.head) whenReady(first.failed) { case e: MqttException => e.getMessage should be("Not authorized to connect") @@ -125,9 +125,11 @@ class MqttSourceSpec val settings = MqttSourceSettings(sourceSettings, Map(topic1 -> MqttQoS.AtLeastOnce)) val (subscriptionFuture, probe) = MqttSource(settings, bufferSize).toMat(TestSink.probe)(Keep.both).run() whenReady(subscriptionFuture) { _ => - Source(1 to bufferSize + overflow).map { i => - MqttMessage(topic1, ByteString(s"ohi_$i")) - }.runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce)) + Source(1 to bufferSize + overflow) + .map { i => + MqttMessage(topic1, ByteString(s"ohi_$i")) + } + .runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce)) (1 to bufferSize + overflow) foreach { i => probe.requestNext shouldBe MqttMessage(topic1, ByteString(s"ohi_$i")) @@ -144,9 +146,11 @@ class MqttSourceSpec whenReady(subscriptionFuture) { _ => probe.request((bufferSize + overflow).toLong) - Source(1 to bufferSize + overflow).map { i => - MqttMessage(topic1, ByteString(s"ohi_$i")) - }.runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce)) + Source(1 to bufferSize + overflow) + .map { i => + MqttMessage(topic1, ByteString(s"ohi_$i")) + } + .runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce)) (1 to bufferSize + overflow) foreach { i => probe.expectNext() shouldBe MqttMessage(topic1, ByteString(s"ohi_$i")) @@ -182,11 +186,12 @@ class MqttSourceSpec whenReady(binding) { _ => val settings = MqttSourceSettings( - sourceSettings - .withClientId("source-spec/testator") - .withBroker("tcp://localhost:1337") - .withWill(Will(MqttMessage(willTopic, ByteString("ohi")), MqttQoS.AtLeastOnce, retained = true)), - Map(willTopic -> MqttQoS.AtLeastOnce)) + sourceSettings + .withClientId("source-spec/testator") + .withBroker("tcp://localhost:1337") + .withWill(Will(MqttMessage(willTopic, ByteString("ohi")), MqttQoS.AtLeastOnce, retained = true)), + Map(willTopic -> MqttQoS.AtLeastOnce) + ) val source = MqttSource(settings, 8) val sub = source.toMat(Sink.head)(Keep.left).run() @@ -198,7 +203,7 @@ class MqttSourceSpec { val settings = MqttSourceSettings(sourceSettings.withClientId("source-spec/executor"), - Map(willTopic -> MqttQoS.AtLeastOnce)) + Map(willTopic -> MqttQoS.AtLeastOnce)) val source = MqttSource(settings, 8) val (sub, elem) = source.toMat(Sink.head)(Keep.both).run() diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala b/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala index 18048322bf..b139f42607 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala @@ -9,8 +9,10 @@ class S3Exception(val code: String, val message: String, val requestID: String, extends RuntimeException(message) { def this(xmlResponse: Elem) = - this((xmlResponse \ "Code").text, (xmlResponse \ "Message").text, (xmlResponse \ "RequestID").text, - (xmlResponse \ "HostID").text) + this((xmlResponse \ "Code").text, + (xmlResponse \ "Message").text, + (xmlResponse \ "RequestID").text, + (xmlResponse \ "HostID").text) def this(response: String) = this(XML.loadString(response)) } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/auth/Signer.scala b/s3/src/main/scala/akka/stream/alpakka/s3/auth/Signer.scala index b2b2faaf2b..e48c8533e1 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/auth/Signer.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/auth/Signer.scala @@ -17,7 +17,8 @@ private[alpakka] object Signer { private val dateFormatter = DateTimeFormatter.ofPattern("YYYYMMdd'T'HHmmssX") def signedRequest(request: HttpRequest, key: SigningKey, date: ZonedDateTime = ZonedDateTime.now(ZoneOffset.UTC))( - implicit mat: Materializer): Future[HttpRequest] = { + implicit mat: Materializer + ): Future[HttpRequest] = { import mat.executionContext val hashedBody = request.entity.dataBytes.runWith(digest()).map { case hash => encodeHex(hash.toArray) @@ -26,7 +27,7 @@ private[alpakka] object Signer { hashedBody.map { case hb => val headersToAdd = Vector(RawHeader("x-amz-date", date.format(dateFormatter)), - RawHeader("x-amz-content-sha256", hb)) ++ sessionHeader(key.credentials) + RawHeader("x-amz-content-sha256", hb)) ++ sessionHeader(key.credentials) val reqWithHeaders = request.withHeaders(request.headers ++ headersToAdd) val cr = CanonicalRequest.from(reqWithHeaders) val authHeader = authorizationHeader("AWS4-HMAC-SHA256", key, date, cr) diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala index 35c57977d7..938c53c028 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala @@ -56,7 +56,8 @@ private[alpakka] object HttpRequests { def completeMultipartUploadRequest(upload: MultipartUpload, parts: Seq[(Int, String)], region: String)( implicit ec: ExecutionContext, - conf: S3Settings): Future[HttpRequest] = { + conf: S3Settings + ): Future[HttpRequest] = { //Do not let the start PartNumber,ETag and the end PartNumber,ETag be on different lines // They tend to get split when this file is formatted by IntelliJ unless http://stackoverflow.com/a/19492318/1216965 diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala index f436d19c28..88c039b497 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala @@ -48,9 +48,9 @@ object S3Stream { new S3Stream(credentials, region, S3Settings(system)) } -private[alpakka] final class S3Stream(credentials: AWSCredentials, region: String, val settings: S3Settings)( - implicit system: ActorSystem, - mat: Materializer) { +private[alpakka] final class S3Stream(credentials: AWSCredentials, + region: String, + val settings: S3Settings)(implicit system: ActorSystem, mat: Materializer) { import Marshalling._ import HttpRequests._ @@ -76,8 +76,8 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, region: Strin cannedAcl: CannedAcl = CannedAcl.Private, chunkSize: Int = MinChunkSize, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = - chunkAndRequest(s3Location, contentType, metaHeaders, cannedAcl, chunkSize)(chunkingParallelism).toMat( - completionSink(s3Location))(Keep.right) + chunkAndRequest(s3Location, contentType, metaHeaders, cannedAcl, chunkSize)(chunkingParallelism) + .toMat(completionSink(s3Location))(Keep.right) private def initiateMultipartUpload(s3Location: S3Location, contentType: ContentType, @@ -128,10 +128,13 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, region: Strin metaHeaders: MetaHeaders, cannedAcl: CannedAcl = CannedAcl.Private, chunkSize: Int = MinChunkSize, - parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = { + parallelism: Int = 4 + ): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = { - assert(chunkSize >= MinChunkSize, - "Chunk size must be at least 5242880B. See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html") + assert( + chunkSize >= MinChunkSize, + "Chunk size must be at least 5242880B. See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html" + ) // First step of the multi part upload process is made. // The response is then used to construct the subsequent individual upload part requests @@ -166,7 +169,8 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, region: Strin contentType: ContentType, metaHeaders: MetaHeaders, cannedAcl: CannedAcl = CannedAcl.Private, - chunkSize: Int = MinChunkSize)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = { + chunkSize: Int = MinChunkSize + )(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = { // Multipart upload requests (except for the completion api) are created here. // The initial upload request gets executed within this function as well. @@ -190,17 +194,19 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, region: Strin import mat.executionContext Sink.seq[UploadPartResponse].mapMaterializedValue { responseFuture: Future[Seq[UploadPartResponse]] => - responseFuture.flatMap { responses: Seq[UploadPartResponse] => - val successes = responses.collect { case r: SuccessfulUploadPart => r } - val failures = responses.collect { case r: FailedUploadPart => r } - if (responses.isEmpty) { - Future.failed(new RuntimeException("No Responses")) - } else if (failures.isEmpty) { - Future.successful(successes.sortBy(_.index)) - } else { - Future.failed(FailedUpload(failures.map(_.exception))) + responseFuture + .flatMap { responses: Seq[UploadPartResponse] => + val successes = responses.collect { case r: SuccessfulUploadPart => r } + val failures = responses.collect { case r: FailedUploadPart => r } + if (responses.isEmpty) { + Future.failed(new RuntimeException("No Responses")) + } else if (failures.isEmpty) { + Future.successful(successes.sortBy(_.index)) + } else { + Future.failed(FailedUpload(failures.map(_.exception))) + } } - }.flatMap(completeMultipartUpload(s3Location, _)) + .flatMap(completeMultipartUpload(s3Location, _)) } } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/auth/SignerSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/auth/SignerSpec.scala index 6b79a6269e..3735c5dc1a 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/auth/SignerSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/auth/SignerSpec.scala @@ -48,7 +48,8 @@ class SignerSpec(_system: ActorSystem) extends TestKit(_system) with FlatSpecLik val date = LocalDateTime.of(2015, 8, 30, 12, 36, 0).atZone(ZoneOffset.UTC) val stringToSign: String = Signer.stringToSign("AWS4-HMAC-SHA256", signingKey, date, cr) stringToSign should equal( - "AWS4-HMAC-SHA256\n20150830T123600Z\n20150830/us-east-1/iam/aws4_request\nf536975d06c0309214f805bb90ccff089219ecd68b2577efef23edd43b7e1a59") + "AWS4-HMAC-SHA256\n20150830T123600Z\n20150830/us-east-1/iam/aws4_request\nf536975d06c0309214f805bb90ccff089219ecd68b2577efef23edd43b7e1a59" + ) } it should "add the date, content hash, and authorization headers to a request" in { @@ -71,8 +72,10 @@ class SignerSpec(_system: ActorSystem) extends TestKit(_system) with FlatSpecLik RawHeader("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"), RawHeader("x-amz-date", "20150830T123600Z"), RawHeader("x-amz-content-sha256", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"), - RawHeader("Authorization", - "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20150830/us-east-1/iam/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date, Signature=dd479fa8a80364edf2119ec24bebde66712ee9c9cb2b0d92eb3ab9ccdc0c3947") + RawHeader( + "Authorization", + "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20150830/us-east-1/iam/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date, Signature=dd479fa8a80364edf2119ec24bebde66712ee9c9cb2b0d92eb3ab9ccdc0c3947" + ) ) ) } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/auth/SigningKeySpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/auth/SigningKeySpec.scala index 92e730af54..cb94e3c6ea 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/auth/SigningKeySpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/auth/SigningKeySpec.scala @@ -26,6 +26,7 @@ class SigningKeySpec extends FlatSpec with Matchers { val sts = "AWS4-HMAC-SHA256\n20150830T123600Z\n20150830/us-east-1/iam/aws4_request\nf536975d06c0309214f805bb90ccff089219ecd68b2577efef23edd43b7e1a59" signingKey.hexEncodedSignature(sts.getBytes) should equal( - "5d672d79c15b13162d9279b0855cfba6789a8edb4c82c400e06b5924a6f2b5d7") + "5d672d79c15b13162d9279b0855cfba6789a8edb4c82c400e06b5924a6f2b5d7" + ) } } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala index 368285c98c..df33223119 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala @@ -48,8 +48,10 @@ class DiskBufferSpec(_system: ActorSystem) it should "fail if more than maxSize bytes are fed into it" in { EventFilter[BufferOverflowException](occurrences = 1) intercept { whenReady( - Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), - ByteString(13, 14))).via(new DiskBuffer(1, 10, None)).runWith(Sink.seq).failed + Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14))) + .via(new DiskBuffer(1, 10, None)) + .runWith(Sink.seq) + .failed ) { e => e shouldBe a[BufferOverflowException] } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala index 0355bda192..25aea43a1b 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala @@ -40,8 +40,10 @@ class MemoryBufferSpec(_system: ActorSystem) it should "fail if more than maxSize bytes are fed into it" in { whenReady( - Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), - ByteString(13, 14))).via(new MemoryBuffer(10)).runWith(Sink.seq).failed + Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14))) + .via(new MemoryBuffer(10)) + .runWith(Sink.seq) + .failed ) { e => e shouldBe a[IllegalStateException] } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala index bb346571ef..4e18fc5ecb 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala @@ -26,27 +26,52 @@ class S3SinkSpec extends WireMockBase { val etag = "5b27a21a97fcf8a7004dd1d906e7a5ba" val url = s"http://testbucket.s3.amazonaws.com/testKey" mock - .register(post(urlEqualTo(s"/$key?uploads")).willReturn(aResponse().withStatus(200).withHeader("x-amz-id-2", "Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==").withHeader("x-amz-request-id", "656c76696e6727732072657175657374").withBody(s""" + .register( + post(urlEqualTo(s"/$key?uploads")).willReturn( + aResponse() + .withStatus(200) + .withHeader("x-amz-id-2", "Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==") + .withHeader("x-amz-request-id", "656c76696e6727732072657175657374") + .withBody(s""" | | $bucket | $key | $uploadId - |""".stripMargin))) + |""".stripMargin) + ) + ) - mock.register(put(urlEqualTo(s"/$key?partNumber=1&uploadId=$uploadId")) + mock.register( + put(urlEqualTo(s"/$key?partNumber=1&uploadId=$uploadId")) .withRequestBody(matching(body)) - .willReturn(aResponse().withStatus(200).withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt").withHeader("x-amz-request-id", "5A37448A37622243").withHeader("ETag", "\"" + etag + "\""))) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt") + .withHeader("x-amz-request-id", "5A37448A37622243") + .withHeader("ETag", "\"" + etag + "\"") + ) + ) - mock.register(post(urlEqualTo(s"/$key?uploadId=$uploadId")) + mock.register( + post(urlEqualTo(s"/$key?uploadId=$uploadId")) .withRequestBody(containing("CompleteMultipartUpload")) .withRequestBody(containing(etag)) - .willReturn(aResponse().withStatus(200).withHeader("Content-Type", "application/xml; charset=UTF-8").withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt").withHeader("x-amz-request-id", "5A37448A3762224333").withBody(s""" + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/xml; charset=UTF-8") + .withHeader("x-amz-id-2", "Zn8bf8aEFQ+kBnGPBc/JaAf9SoWM68QDPS9+SyFwkIZOHUG2BiRLZi5oXw4cOCEt") + .withHeader("x-amz-request-id", "5A37448A3762224333") + .withBody(s""" | | $url | $bucket | $key | "$etag" - |""".stripMargin))) + |""".stripMargin) + ) + ) val result = Source(ByteString(body) :: Nil) .toMat(new S3Client(AWSCredentials("", ""), "us-east-1").multipartUpload(bucket, key))(Keep.right) @@ -56,8 +81,10 @@ class S3SinkSpec extends WireMockBase { } it should "fail if request returns 404" in { - val result = new S3Client(AWSCredentials("", ""), - "us-east-1").download("sometest4398673", "30000184.xml").map(_.decodeString("utf8")).runWith(Sink.head) + val result = new S3Client(AWSCredentials("", ""), "us-east-1") + .download("sometest4398673", "30000184.xml") + .map(_.decodeString("utf8")) + .runWith(Sink.head) whenReady(result.failed) { e => e shouldBe a[S3Exception] e.asInstanceOf[S3Exception].code should equal("NoSuchKey") diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala index b60efa0172..fe3cf411eb 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SourceSpec.scala @@ -19,17 +19,25 @@ class S3SourceSpec extends WireMockBase { "S3Source" should "work in a happy case" in { val body = "Some content" mock - .register(get(urlEqualTo("/testKey")).willReturn(aResponse().withStatus(200).withHeader("ETag", """"fba9dede5f27731c9771645a39863328"""").withBody(body))) + .register( + get(urlEqualTo("/testKey")).willReturn( + aResponse().withStatus(200).withHeader("ETag", """"fba9dede5f27731c9771645a39863328"""").withBody(body) + ) + ) - val result = new S3Client(AWSCredentials("", ""), - "us-east-1").download("testBucket", "testKey").map(_.decodeString("utf8")).runWith(Sink.head) + val result = new S3Client(AWSCredentials("", ""), "us-east-1") + .download("testBucket", "testKey") + .map(_.decodeString("utf8")) + .runWith(Sink.head) Await.ready(result, 5.seconds).futureValue shouldBe body } it should "fail if request returns 404" in { - val result = new S3Client(AWSCredentials("", ""), - "us-east-1").download("sometest4398673", "30000184.xml").map(_.decodeString("utf8")).runWith(Sink.head) + val result = new S3Client(AWSCredentials("", ""), "us-east-1") + .download("sometest4398673", "30000184.xml") + .map(_.decodeString("utf8")) + .runWith(Sink.head) whenReady(result.failed) { e => e shouldBe a[S3Exception] e.asInstanceOf[S3Exception].code should equal("NoSuchKey") diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/SplitAfterSizeSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/SplitAfterSizeSpec.scala index 75e27fc662..54693ded94 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/SplitAfterSizeSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/SplitAfterSizeSpec.scala @@ -48,10 +48,11 @@ class SplitAfterSizeSpec(_system: ActorSystem) ) .runWith(Sink.seq) .futureValue should be( - Seq( - Seq(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12)), - Seq(ByteString(13, 14)) - )) + Seq( + Seq(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12)), + Seq(ByteString(13, 14)) + ) + ) } } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala index d33315ec22..9f17a9e562 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala @@ -34,8 +34,8 @@ object WireMockBase { ) def getCallerName(clazz: Class[_]): String = { - val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1).dropWhile( - _ matches "(java.lang.Thread|.*WireMockBase.?$)") + val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1) + .dropWhile(_ matches "(java.lang.Thread|.*WireMockBase.?$)") val reduced = s.lastIndexWhere(_ == clazz.getName) match { case -1 ⇒ s case z ⇒ s drop (z + 1) @@ -45,11 +45,12 @@ object WireMockBase { def initServer(): WireMockServer = { val server = new WireMockServer( - wireMockConfig() - .dynamicPort() - .dynamicHttpsPort() - .keystorePath("./s3/src/test/resources/keystore.jks") - .keystorePassword("abcdefg")) + wireMockConfig() + .dynamicPort() + .dynamicHttpsPort() + .keystorePath("./s3/src/test/resources/keystore.jks") + .keystorePassword("abcdefg") + ) server.start() server } @@ -73,11 +74,15 @@ abstract class WireMockBase(_system: ActorSystem, _wireMockServer: WireMockServe override def beforeAll(): Unit = mock.register( - any(anyUrl()).willReturn( - aResponse() - .withStatus(404) - .withBody("NoSuchKeyNo key found" + - "XXXXXXXX"))) + any(anyUrl()).willReturn( + aResponse() + .withStatus(404) + .withBody( + "NoSuchKeyNo key found" + + "XXXXXXXX" + ) + ) + ) override def afterAll(): Unit = _wireMockServer.stop() } diff --git a/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/scaladsl/RecordIOFraming.scala b/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/scaladsl/RecordIOFraming.scala index 069a49e37c..bfb9e38c9d 100644 --- a/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/scaladsl/RecordIOFraming.scala +++ b/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/scaladsl/RecordIOFraming.scala @@ -110,8 +110,10 @@ object RecordIOFraming { Try(recordSizePrefix.utf8String.toInt) match { case Success(length) if length > maxRecordLength => failStage( - new FramingException( - s"Record of size $length bytes exceeds maximum of $maxRecordLength bytes.")) + new FramingException( + s"Record of size $length bytes exceeds maximum of $maxRecordLength bytes." + ) + ) case Success(length) if length < 0 => failStage(new FramingException(s"Record size prefix $length is negative.")) case Success(length) => diff --git a/simple-codecs/src/test/scala/akka/stream/alpakka/recordio/RecordIOFramingSpec.scala b/simple-codecs/src/test/scala/akka/stream/alpakka/recordio/RecordIOFramingSpec.scala index a369dff434..1059100503 100644 --- a/simple-codecs/src/test/scala/akka/stream/alpakka/recordio/RecordIOFramingSpec.scala +++ b/simple-codecs/src/test/scala/akka/stream/alpakka/recordio/RecordIOFramingSpec.scala @@ -44,8 +44,8 @@ class RecordIOFramingSpec(_system: ActorSystem) // When //#run-via-scanner val result = basicSource via - RecordIOFraming.scanner() runWith - Sink.seq + RecordIOFraming.scanner() runWith + Sink.seq //#run-via-scanner // Then @@ -60,8 +60,8 @@ class RecordIOFramingSpec(_system: ActorSystem) // When val result = Source.single(ByteString(recordIOInput)) via - RecordIOFraming.scanner() runWith - stringSeqSink + RecordIOFraming.scanner() runWith + stringSeqSink // Then result.futureValue shouldBe Seq(FirstRecordData, SecondRecordData) @@ -75,12 +75,13 @@ class RecordIOFramingSpec(_system: ActorSystem) val chunkedInput = Seq(whitespaceChunk, secondChunk, thirdChunk, whitespaceChunk, fifthChunk, sixthChunk, whitespaceChunk).map( - ByteString(_)) + ByteString(_) + ) // When val result = Source(chunkedInput) via - RecordIOFraming.scanner() runWith - stringSeqSink + RecordIOFraming.scanner() runWith + stringSeqSink // Then result.futureValue shouldBe Seq(FirstRecordData, SecondRecordData) @@ -89,9 +90,9 @@ class RecordIOFramingSpec(_system: ActorSystem) it should "handle an empty stream" in { // When val result = - Source.empty via - RecordIOFraming.scanner() runWith - stringSeqSink + Source.empty via + RecordIOFraming.scanner() runWith + stringSeqSink // Then result.futureValue shouldBe Seq.empty @@ -103,8 +104,8 @@ class RecordIOFramingSpec(_system: ActorSystem) // When val result = Source(input) via - RecordIOFraming.scanner() runWith - stringSeqSink + RecordIOFraming.scanner() runWith + stringSeqSink // Then result.futureValue shouldBe Seq.empty @@ -116,8 +117,8 @@ class RecordIOFramingSpec(_system: ActorSystem) // When val result = Source.single(ByteString(recordIOInput)) via - RecordIOFraming.scanner(1024) runWith - stringSeqSink + RecordIOFraming.scanner(1024) runWith + stringSeqSink // Then result.failed.futureValue shouldBe a[NumberFormatException] @@ -129,8 +130,8 @@ class RecordIOFramingSpec(_system: ActorSystem) // When val result = infinitePrefixSource via - RecordIOFraming.scanner(1024) runWith - stringSeqSink + RecordIOFraming.scanner(1024) runWith + stringSeqSink // Then result.failed.futureValue shouldBe a[FramingException] @@ -142,8 +143,8 @@ class RecordIOFramingSpec(_system: ActorSystem) // When val result = Source.single(ByteString(recordIOInput)) via - RecordIOFraming.scanner() runWith - stringSeqSink + RecordIOFraming.scanner() runWith + stringSeqSink // Then result.failed.futureValue shouldBe a[FramingException] @@ -155,8 +156,8 @@ class RecordIOFramingSpec(_system: ActorSystem) // When val result = Source.single(ByteString(recordIOInput)) via - RecordIOFraming.scanner(FirstRecordData.length - 1) runWith - stringSeqSink + RecordIOFraming.scanner(FirstRecordData.length - 1) runWith + stringSeqSink // Then result.failed.futureValue shouldBe a[FramingException] @@ -168,8 +169,8 @@ class RecordIOFramingSpec(_system: ActorSystem) // When val result = Source.single(ByteString(recordIOInput)) via - RecordIOFraming.scanner() runWith - stringSeqSink + RecordIOFraming.scanner() runWith + stringSeqSink // Then result.failed.futureValue shouldBe a[FramingException] diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckSinkStage.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckSinkStage.scala index deb1681e31..3cb9f69c24 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckSinkStage.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckSinkStage.scala @@ -56,39 +56,41 @@ private[sqs] class SqsAckSinkStageLogic( private var amazonSendMessageHandler: AsyncHandler[SendMessageRequest, SendMessageResult] = _ private var amazonDeleteMessageHandler: AsyncHandler[DeleteMessageRequest, DeleteMessageResult] = _ - setHandler(in, + setHandler( + in, new InHandler { - override def onPush(): Unit = { - inFlight += 1 - val (message, action) = grab(in) - action match { - case Ack() => - sqsClient.deleteMessageAsync( - new DeleteMessageRequest(queueUrl, message.getReceiptHandle), - amazonDeleteMessageHandler - ) - case RequeueWithDelay(delaySeconds) => - sqsClient.sendMessageAsync( - new SendMessageRequest(queueUrl, message.getBody).withDelaySeconds(delaySeconds), - amazonSendMessageHandler - ) + override def onPush(): Unit = { + inFlight += 1 + val (message, action) = grab(in) + action match { + case Ack() => + sqsClient.deleteMessageAsync( + new DeleteMessageRequest(queueUrl, message.getReceiptHandle), + amazonDeleteMessageHandler + ) + case RequeueWithDelay(delaySeconds) => + sqsClient.sendMessageAsync( + new SendMessageRequest(queueUrl, message.getBody).withDelaySeconds(delaySeconds), + amazonSendMessageHandler + ) + } + + tryPull() } - tryPull() - } - - override def onUpstreamFailure(exception: Throwable): Unit = { - log.error(exception, "Upstream failure: {}", exception.getMessage) - failStage(exception) - promise.tryFailure(exception) - } + override def onUpstreamFailure(exception: Throwable): Unit = { + log.error(exception, "Upstream failure: {}", exception.getMessage) + failStage(exception) + promise.tryFailure(exception) + } - override def onUpstreamFinish(): Unit = { - log.debug("Upstream finish") - isShutdownInProgress = true - tryShutdown() + override def onUpstreamFinish(): Unit = { + log.debug("Upstream finish") + isShutdownInProgress = true + tryShutdown() + } } - }) + ) override def preStart(): Unit = { setKeepGoing(true) diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkStage.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkStage.scala index 375595db2e..f61f5f3f38 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkStage.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkStage.scala @@ -51,25 +51,27 @@ private[sqs] class SqsSinkStageLogic( private var isShutdownInProgress = false private var amazonSendMessageHandler: AsyncHandler[SendMessageRequest, SendMessageResult] = _ - setHandler(in, + setHandler( + in, new InHandler { - override def onPush(): Unit = { - inFlight += 1 - sqsClient.sendMessageAsync(new SendMessageRequest(queueUrl, grab(in)), amazonSendMessageHandler) - tryPull() + override def onPush(): Unit = { + inFlight += 1 + sqsClient.sendMessageAsync(new SendMessageRequest(queueUrl, grab(in)), amazonSendMessageHandler) + tryPull() + } + + override def onUpstreamFailure(exception: Throwable): Unit = { + log.error(exception, "Upstream failure: {}", exception.getMessage) + failStage(exception) + promise.tryFailure(exception) + } + + override def onUpstreamFinish(): Unit = { + isShutdownInProgress = true + tryShutdown() + } } - - override def onUpstreamFailure(exception: Throwable): Unit = { - log.error(exception, "Upstream failure: {}", exception.getMessage) - failStage(exception) - promise.tryFailure(exception) - } - - override def onUpstreamFinish(): Unit = { - isShutdownInProgress = true - tryShutdown() - } - }) + ) override def preStart(): Unit = { setKeepGoing(true) diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala index 09d2b13949..bf1eeefeb5 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala @@ -30,9 +30,9 @@ final case class SqsSourceSettings( require(maxBatchSize <= maxBufferSize, "maxBatchSize must be lower or equal than maxBufferSize") // SQS requirements require(0 <= waitTimeSeconds && waitTimeSeconds <= 20, - s"Invalid value ($waitTimeSeconds) for waitTimeSeconds. Requirement: 0 <= waitTimeSeconds <= 20 ") + s"Invalid value ($waitTimeSeconds) for waitTimeSeconds. Requirement: 0 <= waitTimeSeconds <= 20 ") require(1 <= maxBatchSize && maxBatchSize <= 10, - s"Invalid value ($maxBatchSize) for maxBatchSize. Requirement: 1 <= maxBatchSize <= 10 ") + s"Invalid value ($maxBatchSize) for maxBatchSize. Requirement: 1 <= maxBatchSize <= 10 ") } //#SqsSourceSettings @@ -68,14 +68,16 @@ final class SqsSourceStage(queueUrl: String, settings: SqsSourceSettings)(implic .withMaxNumberOfMessages(settings.maxBatchSize) .withWaitTimeSeconds(settings.waitTimeSeconds) - sqsClient.receiveMessageAsync(request, + sqsClient.receiveMessageAsync( + request, new AsyncHandler[ReceiveMessageRequest, ReceiveMessageResult] { - override def onError(e: Exception): Unit = - failureCallback.invoke(e) + override def onError(e: Exception): Unit = + failureCallback.invoke(e) - override def onSuccess(request: ReceiveMessageRequest, result: ReceiveMessageResult): Unit = - successCallback.invoke(result) - }) + override def onSuccess(request: ReceiveMessageRequest, result: ReceiveMessageResult): Unit = + successCallback.invoke(result) + } + ) } def handleFailure(ex: Exception): Unit = @@ -97,17 +99,19 @@ final class SqsSourceStage(queueUrl: String, settings: SqsSourceSettings)(implic } } - setHandler(out, + setHandler( + out, new OutHandler { - override def onPull(): Unit = - if (!buffer.isEmpty) { - push(out, buffer.poll()) - if (canReceiveNewMessages) { + override def onPull(): Unit = + if (!buffer.isEmpty) { + push(out, buffer.poll()) + if (canReceiveNewMessages) { + receiveMessages() + } + } else { receiveMessages() } - } else { - receiveMessages() - } - }) + } + ) } } diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsAckSink.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsAckSink.scala index 20a4a63a01..a2c02dd2fa 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsAckSink.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsAckSink.scala @@ -16,6 +16,7 @@ object SqsAckSink { * Scala API: creates a [[SqsAckSinkStage]] for a SQS queue using an [[com.amazonaws.services.sqs.AmazonSQSAsync]] */ def apply(queueUrl: String, settings: SqsAckSinkSettings = SqsAckSinkSettings.Defaults)( - implicit sqsClient: AmazonSQSAsync): Sink[MessageActionPair, Future[Done]] = + implicit sqsClient: AmazonSQSAsync + ): Sink[MessageActionPair, Future[Done]] = Sink.fromGraph(new SqsAckSinkStage(queueUrl, settings, sqsClient)) } diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSink.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSink.scala index c046d2216e..95dcd1b907 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSink.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSink.scala @@ -16,6 +16,7 @@ object SqsSink { * Scala API: creates a [[SqsSinkStage]] for a SQS queue using an [[AmazonSQSAsync]] */ def apply(queueUrl: String, settings: SqsSinkSettings = SqsSinkSettings.Defaults)( - implicit sqsClient: AmazonSQSAsync): Sink[String, Future[Done]] = + implicit sqsClient: AmazonSQSAsync + ): Sink[String, Future[Done]] = Sink.fromGraph(new SqsSinkStage(queueUrl, settings, sqsClient)) } diff --git a/sse/src/main/scala/akka/stream/alpakka/sse/scaladsl/EventSource.scala b/sse/src/main/scala/akka/stream/alpakka/sse/scaladsl/EventSource.scala index 7011d6e8a8..d71a1a6d6a 100644 --- a/sse/src/main/scala/akka/stream/alpakka/sse/scaladsl/EventSource.scala +++ b/sse/src/main/scala/akka/stream/alpakka/sse/scaladsl/EventSource.scala @@ -73,7 +73,8 @@ object EventSource { * @return continuous source of server-sent events */ def apply(uri: Uri, send: HttpRequest => Future[HttpResponse], lastEventId: Option[String] = None)( - implicit mat: Materializer): EventSource = { + implicit mat: Materializer + ): EventSource = { import EventStreamUnmarshalling._ import mat.executionContext