Skip to content

Commit

Permalink
Merge pull request #231 from hseeberger/upgrade-scalafmt
Browse files Browse the repository at this point in the history
Upgrade to scalafmt 0.6.6
  • Loading branch information
2m authored Mar 29, 2017
2 parents 0238f77 + 6c0faf9 commit 358bb4f
Show file tree
Hide file tree
Showing 53 changed files with 696 additions and 448 deletions.
1 change: 0 additions & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ indentOperator = spray
maxColumn = 120
rewrite.rules = [RedundantBraces, RedundantParens, SortImports]
unindentTopLevelOperators = true
binPack.callSite = true
28 changes: 15 additions & 13 deletions amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,23 @@ final class AmqpSinkStage(settings: AmqpSinkSettings)
pull(in)
}

setHandler(in,
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
channel.basicPublish(
exchange,
routingKey,
elem.mandatory,
elem.immediate,
elem.props.orNull,
elem.bytes.toArray
)
pull(in)
override def onPush(): Unit = {
val elem = grab(in)
channel.basicPublish(
exchange,
routingKey,
elem.mandatory,
elem.immediate,
elem.props.orNull,
elem.bytes.toArray
)
pull(in)
}
}
})
)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSize: Int)
}
}

setHandler(out,
new OutHandler {
setHandler(out, new OutHandler {
override def onPull(): Unit =
if (queue.nonEmpty) {
pushAndAckMessage(queue.dequeue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ class AmqpConnectorsSpec extends AmqpSpec {
val merge = b.add(Merge[IncomingMessage](count))
for (n <- 0 until count) {
val source = b.add(
AmqpSource(
NamedQueueSourceSettings(DefaultAmqpConnection, queueName).withDeclarations(queueDeclaration),
bufferSize = 1
))
AmqpSource(
NamedQueueSourceSettings(DefaultAmqpConnection, queueName).withDeclarations(queueDeclaration),
bufferSize = 1
)
)
source.out ~> merge.in(n)
}

Expand Down Expand Up @@ -232,7 +233,8 @@ class AmqpConnectorsSpec extends AmqpSpec {

import system.dispatcher
system.scheduler.scheduleOnce(5.seconds)(
completion.tryFailure(new Error("Did not get at least one element from every fanout branch")))
completion.tryFailure(new Error("Did not get at least one element from every fanout branch"))
)

Source.repeat("stuff").map(s => ByteString(s)).runWith(amqpSink)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ object AwsLambdaFlow {
/**
* Scala API: creates a [[AwsLambdaFlowStage]] for a AWS Lambda function invocation using [[AWSLambdaClient]]
*/
def apply(parallelism: Int)(
implicit awsLambdaClient: AWSLambdaAsyncClient): Flow[InvokeRequest, InvokeResult, NotUsed] =
def apply(
parallelism: Int
)(implicit awsLambdaClient: AWSLambdaAsyncClient): Flow[InvokeRequest, InvokeResult, NotUsed] =
Flow.fromGraph(new AwsLambdaFlowStage(awsLambdaClient)(parallelism))

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ class AwsLambdaFlowSpec

"call a single invoke request" in {

when(awsLambdaClient.invokeAsync(mockitoEq(invokeRequest),
mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]())).thenAnswer(new Answer[AnyRef] {
when(
awsLambdaClient.invokeAsync(mockitoEq(invokeRequest), mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]())
).thenAnswer(new Answer[AnyRef] {
override def answer(invocation: InvocationOnMock): AnyRef = {
invocation.getArgument[AsyncHandler[InvokeRequest, InvokeResult]](1).onSuccess(invokeRequest, invokeResult)
CompletableFuture.completedFuture(invokeResult)
Expand All @@ -67,14 +68,16 @@ class AwsLambdaFlowSpec

future.map(_ shouldBe invokeResult)
verify(awsLambdaClient, times(1)).invokeAsync(mockitoEq(invokeRequest),
mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]())
mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]())

}

"call with exception" in {

when(awsLambdaClient.invokeAsync(mockitoAny[InvokeRequest](),
mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]())).thenAnswer(new Answer[Future[InvokeResult]] {
when(
awsLambdaClient.invokeAsync(mockitoAny[InvokeRequest](),
mockitoAny[AsyncHandler[InvokeRequest, InvokeResult]]())
).thenAnswer(new Answer[Future[InvokeResult]] {
override def answer(invocation: InvocationOnMock): Future[InvokeResult] = {
val exception = new RuntimeException("Error in lambda")
invocation.getArgument[AsyncHandler[InvokeRequest, InvokeResult]](1).onError(exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,24 @@ final class CassandraSourceStage(futStmt: Future[Statement], session: Session) e
futRs.onComplete(futFetchedCallback.invoke)
}

setHandler(out,
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
implicit val ec = materializer.executionContext
override def onPull(): Unit = {
implicit val ec = materializer.executionContext

maybeRs match {
case Some(rs) if rs.getAvailableWithoutFetching > 0 => push(out, rs.one())
case Some(rs) if rs.isExhausted => completeStage()
case Some(rs) =>
// fetch next page
val futRs = rs.fetchMoreResults().asScala()
futRs.onComplete(futFetchedCallback.invoke)
case None => () // doing nothing, waiting for futRs in preStart() to be completed
maybeRs match {
case Some(rs) if rs.getAvailableWithoutFetching > 0 => push(out, rs.one())
case Some(rs) if rs.isExhausted => completeStage()
case Some(rs) =>
// fetch next page
val futRs = rs.fetchMoreResults().asScala()
futRs.onComplete(futFetchedCallback.invoke)
case None => () // doing nothing, waiting for futRs in preStart() to be completed
}
}
}
})
)

private def tryPushAfterFetch(rsOrFailure: Try[ResultSet]): Unit = rsOrFailure match {
case Success(rs) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import scala.concurrent.{ExecutionContext, Future}
import akka.stream.alpakka.cassandra.GuavaFutures._

object CassandraSink {
def apply[T](parallelism: Int,
statement: PreparedStatement,
statementBinder: (T, PreparedStatement) => BoundStatement)(
implicit session: Session,
ex: ExecutionContext): Sink[T, Future[Done]] =
def apply[T](
parallelism: Int,
statement: PreparedStatement,
statementBinder: (T, PreparedStatement) => BoundStatement
)(implicit session: Session, ex: ExecutionContext): Sink[T, Future[Done]] =
Flow[T]
.mapAsyncUnordered(parallelism)(t session.executeAsync(statementBinder(t, statement)).asScala())
.toMat(Sink.ignore)(Keep.right)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,16 @@ private[alpakka] trait AwsClient[S <: ClientSettings] {
val amzHeaders = original.getHeaders
val body = read(original.getContent)

val httpr = HttpRequest(uri = signableUrl, method = original.getHttpMethod,
val httpr = HttpRequest(
uri = signableUrl,
method = original.getHttpMethod,
headers = List(
headers.RawHeader("x-amz-date", amzHeaders.get("X-Amz-Date")),
headers.RawHeader("authorization", amzHeaders.get("Authorization")),
headers.RawHeader("x-amz-target", amzHeaders.get("X-Amz-Target"))
),
entity = HttpEntity(defaultContentType, body))
entity = HttpEntity(defaultContentType, body)
)

httpr -> AwsRequestMetadata(requestId.getAndIncrement(), s)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import akka.stream.scaladsl.{Sink, Source}
import com.amazonaws.AmazonServiceException
import com.amazonaws.http.HttpResponseHandler

class DynamoClientImpl(val settings: DynamoSettings,
val errorResponseHandler: HttpResponseHandler[AmazonServiceException])(
implicit protected val system: ActorSystem,
implicit protected val materializer: ActorMaterializer)
class DynamoClientImpl(
val settings: DynamoSettings,
val errorResponseHandler: HttpResponseHandler[AmazonServiceException]
)(implicit protected val system: ActorSystem, implicit protected val materializer: ActorMaterializer)
extends AwsClient[DynamoSettings] {

override protected val service = "dynamodb"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,32 @@ private[alpakka] trait DynamoProtocol {
val meta = new JsonOperationMetadata().withPayloadJson(true)

val protocol: SdkJsonProtocolFactory = new SdkJsonProtocolFactory(
new JsonClientMetadata()
.addAllErrorMetadata(
new JsonErrorShapeMetadata()
.withErrorCode("ItemCollectionSizeLimitExceededException")
.withModeledClass(classOf[ItemCollectionSizeLimitExceededException]),
new JsonErrorShapeMetadata()
.withErrorCode("ResourceInUseException")
.withModeledClass(classOf[ResourceInUseException]),
new JsonErrorShapeMetadata()
.withErrorCode("ResourceNotFoundException")
.withModeledClass(classOf[ResourceNotFoundException]),
new JsonErrorShapeMetadata()
.withErrorCode("ProvisionedThroughputExceededException")
.withModeledClass(classOf[ProvisionedThroughputExceededException]),
new JsonErrorShapeMetadata()
.withErrorCode("ConditionalCheckFailedException")
.withModeledClass(classOf[ConditionalCheckFailedException]),
new JsonErrorShapeMetadata()
.withErrorCode("InternalServerError")
.withModeledClass(classOf[InternalServerErrorException]),
new JsonErrorShapeMetadata()
.withErrorCode("LimitExceededException")
.withModeledClass(classOf[LimitExceededException]))
.withBaseServiceExceptionClass(classOf[com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException]))
new JsonClientMetadata()
.addAllErrorMetadata(
new JsonErrorShapeMetadata()
.withErrorCode("ItemCollectionSizeLimitExceededException")
.withModeledClass(classOf[ItemCollectionSizeLimitExceededException]),
new JsonErrorShapeMetadata()
.withErrorCode("ResourceInUseException")
.withModeledClass(classOf[ResourceInUseException]),
new JsonErrorShapeMetadata()
.withErrorCode("ResourceNotFoundException")
.withModeledClass(classOf[ResourceNotFoundException]),
new JsonErrorShapeMetadata()
.withErrorCode("ProvisionedThroughputExceededException")
.withModeledClass(classOf[ProvisionedThroughputExceededException]),
new JsonErrorShapeMetadata()
.withErrorCode("ConditionalCheckFailedException")
.withModeledClass(classOf[ConditionalCheckFailedException]),
new JsonErrorShapeMetadata()
.withErrorCode("InternalServerError")
.withModeledClass(classOf[InternalServerErrorException]),
new JsonErrorShapeMetadata()
.withErrorCode("LimitExceededException")
.withModeledClass(classOf[LimitExceededException])
)
.withBaseServiceExceptionClass(classOf[com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException])
)

val errorResponseHandler: HttpResponseHandler[AmazonServiceException] =
protocol.createErrorResponseHandler(new JsonErrorResponseMetadata())
Expand Down
16 changes: 10 additions & 6 deletions dynamodb/src/test/scala/akka/stream/alpakka/dynamodb/TestOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ object ItemSpecOps extends TestOps {
new PutItemRequest().withTableName(tableName).withItem((keyMap("A", 1) + ("data" -> S(test5Data))).asJava)

val batchWriteItemRequest = new BatchWriteItemRequest().withRequestItems(
Map(
tableName ->
List(new WriteRequest(new PutRequest().withItem((keyMap("B", 0) + ("data" -> S(test5Data))).asJava)),
new WriteRequest(new PutRequest().withItem((keyMap("B",
1) + ("data" -> S(test5Data))).asJava))).asJava).asJava)
Map(
tableName ->
List(
new WriteRequest(new PutRequest().withItem((keyMap("B", 0) + ("data" -> S(test5Data))).asJava)),
new WriteRequest(new PutRequest().withItem((keyMap("B", 1) + ("data" -> S(test5Data))).asJava))
).asJava
).asJava
)

val deleteItemRequest = new DeleteItemRequest().withTableName(tableName).withKey(keyMap("A", 0).asJava)

Expand Down Expand Up @@ -102,7 +105,8 @@ object TableSpecOps extends TestOps {
val updateTableRequest = new UpdateTableRequest()
.withTableName(tableName)
.withProvisionedThroughput(
new ProvisionedThroughput().withWriteCapacityUnits(newMaxLimit).withReadCapacityUnits(newMaxLimit))
new ProvisionedThroughput().withWriteCapacityUnits(newMaxLimit).withReadCapacityUnits(newMaxLimit)
)

val deleteTableRequest = common.deleteTableRequest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,29 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: RemoteFileSettings] exte

private[this] var buffer: Seq[FtpFile] = Seq.empty[FtpFile]

setHandler(out,
setHandler(
out,
new OutHandler {
def onPull(): Unit = {
fillBuffer()
buffer match {
case head +: tail =>
buffer = tail
push(out, head)
case _ => finalize()
}
def finalize() = try { disconnect() } finally { complete(out) }
} // end of onPull

override def onDownstreamFinish(): Unit =
try {
disconnect()
} finally {
matSuccess()
super.onDownstreamFinish()
}
}) // end of handler
def onPull(): Unit = {
fillBuffer()
buffer match {
case head +: tail =>
buffer = tail
push(out, head)
case _ => finalize()
}
def finalize() = try { disconnect() } finally { complete(out) }
} // end of onPull

override def onDownstreamFinish(): Unit =
try {
disconnect()
} finally {
matSuccess()
super.onDownstreamFinish()
}
}
) // end of handler

protected[this] def doPreStart(): Unit =
buffer = initBuffer(basePath)
Expand Down
Loading

0 comments on commit 358bb4f

Please sign in to comment.