diff --git a/build.sbt b/build.sbt index bce4b7a..7c90d20 100644 --- a/build.sbt +++ b/build.sbt @@ -52,6 +52,7 @@ lazy val migrator = (project in file("migrator")).enablePlugins(BuildInfoPlugin) "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0-1-g468079b4", "com.github.jnr" % "jnr-posix" % "3.1.19", // Needed by the cassandra connector "com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.6.1", + "com.scylladb.alternator" % "load-balancing" % "1.0.0", "io.circe" %% "circe-generic" % "0.14.7", "io.circe" %% "circe-parser" % "0.14.7", "io.circe" %% "circe-yaml" % "0.15.1", diff --git a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala index ad77fc7..50b9dc6 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala @@ -1,8 +1,11 @@ package com.scylladb.migrator +import com.scylladb.alternator.AlternatorEndpointProvider import com.scylladb.migrator.config.{ DynamoDBEndpoint, SourceSettings, TargetSettings } -import org.apache.hadoop.dynamodb.DynamoDBConstants +import org.apache.hadoop.conf.{ Configurable, Configuration } import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat +import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat +import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDbClientBuilderTransformer } import org.apache.hadoop.mapred.JobConf import org.apache.log4j.LogManager import software.amazon.awssdk.auth.credentials.{ @@ -10,7 +13,7 @@ import software.amazon.awssdk.auth.credentials.{ AwsCredentialsProvider, ProfileCredentialsProvider } -import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.{ DynamoDbClient, DynamoDbClientBuilder } import software.amazon.awssdk.services.dynamodb.model.{ BillingMode, CreateTableRequest, @@ -30,6 +33,7 @@ import software.amazon.awssdk.services.dynamodb.model.{ import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient import java.util.stream.Collectors +import java.net.URI import scala.util.{ Failure, Success, Try } import scala.jdk.OptionConverters._ @@ -230,11 +234,13 @@ object DynamoUtils { jobConf.set(DynamoDBConstants.YARN_RESOURCE_MANAGER_ENABLED, "false") jobConf.set( - DynamoDBConstants.CUSTOM_CREDENTIALS_PROVIDER_CONF, - "com.scylladb.migrator.DynamoUtils$ProfileCredentialsProvider") + DynamoDBConstants.CUSTOM_CLIENT_BUILDER_TRANSFORMER, + classOf[AlternatorLoadBalancingEnabler].getName) + jobConf.set( - "mapred.output.format.class", - "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") + DynamoDBConstants.CUSTOM_CREDENTIALS_PROVIDER_CONF, + classOf[ProfileCredentialsProvider].getName) + jobConf.set("mapred.output.format.class", classOf[DynamoDBOutputFormat].getName) jobConf.set("mapred.input.format.class", classOf[DynamoDBInputFormat].getName) } @@ -284,4 +290,17 @@ object DynamoUtils { def resolveCredentials(): AwsCredentials = delegate.resolveCredentials() } + class AlternatorLoadBalancingEnabler extends DynamoDbClientBuilderTransformer with Configurable { + private var conf: Configuration = null + + override def apply(builder: DynamoDbClientBuilder): DynamoDbClientBuilder = + builder.endpointProvider( + new AlternatorEndpointProvider(URI.create(conf.get(DynamoDBConstants.ENDPOINT))) + ) + + override def setConf(configuration: Configuration): Unit = + conf = configuration + override def getConf: Configuration = conf + } + }