Skip to content

Commit

Permalink
Merge pull request #199 from julienrf/improve-debugging
Browse files Browse the repository at this point in the history
Improve debug on migration
  • Loading branch information
julienrf authored Aug 27, 2024
2 parents 04aa85c + f8ec5d6 commit a05419b
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 79 deletions.
9 changes: 4 additions & 5 deletions ansible/files/config.dynamodb.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
---
source:
type: dynamodb
table: "YOUR_TABLE_NAME"
Expand All @@ -25,8 +24,7 @@ source:
# Can set it equal to scanSegments. Or to allow multiple concurrent scans by
# a single task, set scanSegments to some multiple of maxMapTasks.
maxMapTasks: 700
streamChanges: false
# For scyllaDB - Alternator target, you need to specify endpoint URL.

target:
type: dynamodb
table: "YOUR_TABLE_NAME"
Expand All @@ -43,16 +41,17 @@ target:
credentials:
accessKey: empty
secretKey: empty
#scanSegments: 10000
maxMapTasks: 1
streamChanges: false

savepoints:
# Where should savepoint configurations be stored? This is a path on the host running
# the Spark driver - usually the Spark master.
path: /app/savepoints
# Interval in which savepoints will be created
intervalSeconds: 300

renames: []

validation:
# Should WRITETIMEs and TTLs be compared?
compareTimestamps: false
Expand Down
20 changes: 7 additions & 13 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,14 @@ target:
# # Optional - the session name to use. If not set, we use 'scylla-migrator'
# sessionName: <roleSessionName>
#
# # Split factor for reading/writing. This is required for Scylla targets.
# scanSegments: 1
#
# # throttling settings, set based on your capacity (or wanted capacity)
# readThroughput: 1
#
# # The value of dynamodb.throughput.read.percent can be between 0.1 and 1.5, inclusively.
# # 0.5 represents the default read rate, meaning that the job will attempt to consume half of the read capacity of the table.
# # If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the read request rate.
# # (The actual read rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
# throughputReadPercent: 1.0
# # throttling settings, set based on your write capacity units (or wanted capacity)
# writeThroughput: 1
#
# # how many tasks per executor?
# maxMapTasks: 1
# # The value of dynamodb.throughput.write.percent can be between 0.1 and 1.5, inclusively.
# # 0.5 represents the default write rate, meaning that the job will attempt to consume half of the write capacity of the table.
# # If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the write request rate.
# # (The actual write rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
# throughputWritePercent: 1.0
#
# # When transferring DynamoDB sources to DynamoDB targets (such as other DynamoDB tables or Alternator tables),
# # the migrator supports transferring live changes occuring on the source table after transferring an initial
Expand Down
19 changes: 8 additions & 11 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ A source of type ``dynamodb`` can be used together with a target of type ``dynam
# Optional - Split factor for reading. The default is to split the source data into chunks
# of 128 MB that can be processed in parallel by the Spark executors.
scanSegments: 1
# Optional - Throttling settings, set based on your database capacity (or wanted capacity)
# Optional - Throttling settings, set based on your database read capacity units (or wanted capacity)
readThroughput: 1
# Optional - Can be between 0.1 and 1.5, inclusively.
# 0.5 represents the default read rate, meaning that the job will attempt to consume half of the read capacity of the table.
Expand Down Expand Up @@ -295,17 +295,14 @@ DynamoDB Target
type: dynamodb
# Name of the table to write. If it does not exist, it will be created on the fly.
table: <table>
# Optional - Split factor for writing.
scanSegments: 1
# Optional - Throttling settings, set based on your database capacity (or wanted capacity)
readThroughput: 1
# Optional - Throttling settings, set based on your database write capacity units (or wanted capacity).
# By default, for provisioned tables we use the configured write capacity units, and for on-demand tables we use the value 40000.
writeThroughput: 1
# Optional - Can be between 0.1 and 1.5, inclusively.
# 0.5 represents the default read rate, meaning that the job will attempt to consume half of the read capacity of the table.
# If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the read request rate.
# (The actual read rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
throughputReadPercent: 1.0
# Optional - At most how many tasks per Spark executor? Default is to use the same as 'scanSegments'.
maxMapTasks: 1
# 0.5 represents the default write rate, meaning that the job will attempt to consume half of the write capacity of the table.
# If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the write request rate.
# (The actual write rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
throughputWritePercent: 1.0
# When transferring DynamoDB sources to DynamoDB targets (such as other DynamoDB tables or Alternator tables),
# the migrator supports transferring live changes occurring on the source table after transferring an initial
# snapshot.
Expand Down
19 changes: 7 additions & 12 deletions docs/source/migrate-from-dynamodb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,15 @@ Additionally, you can also set the following optional properties:
accessKey: <access-key>
secretKey: <secret-key>
# Split factor for writing.
scanSegments: 1
# Throttling settings, set based on your database capacity (or wanted capacity)
readThroughput: 1
# Throttling settings, set based on your database write capacity units (or wanted capacity).
# By default, for provisioned tables we use the configured write capacity units, and for on-demand tables we use the value 40000.
writeThroughput: 1
# Can be between 0.1 and 1.5, inclusively.
# 0.5 represents the default read rate, meaning that the job will attempt to consume half of the read capacity of the table.
# If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the read request rate.
# (The actual read rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
throughputReadPercent: 1.0
# At most how many tasks per Spark executor?
maxMapTasks: 1
# 0.5 represents the default write rate, meaning that the job will attempt to consume half of the write capacity of the table.
# If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the write request rate.
# (The actual write rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
throughputWritePercent: 1.0
# When streamChanges is true, skip the initial snapshot transfer and only stream changes.
# This setting is ignored if streamChanges is false.
Expand Down
37 changes: 16 additions & 21 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,14 @@ object DynamoUtils {
maybeScanSegments: Option[Int],
maybeMaxMapTasks: Option[Int],
maybeAwsCredentials: Option[AWSCredentials]): Unit = {
setOptionalConf(jobConf, DynamoDBConstants.REGION, maybeRegion)
setOptionalConf(jobConf, DynamoDBConstants.ENDPOINT, maybeEndpoint.map(_.renderEndpoint))
for (region <- maybeRegion) {
log.info(s"Using AWS region: ${region}")
jobConf.set(DynamoDBConstants.REGION, region)
}
for (endpoint <- maybeEndpoint) {
log.info(s"Using AWS endpoint: ${endpoint.renderEndpoint}")
jobConf.set(DynamoDBConstants.ENDPOINT, endpoint.renderEndpoint)
}
setOptionalConf(jobConf, DynamoDBConstants.SCAN_SEGMENTS, maybeScanSegments.map(_.toString))
setOptionalConf(jobConf, DynamoDBConstants.MAX_MAP_TASKS, maybeMaxMapTasks.map(_.toString))
for (credentials <- maybeAwsCredentials) {
Expand All @@ -245,42 +251,31 @@ object DynamoUtils {
}

/**
* @return The read throughput (in bytes per second) of the
* provided table description.
* @return The read throughput (in RCU) of the provided table description.
* If the table billing mode is PROVISIONED, it returns the
* table RCU multiplied by the number of bytes per read
* capacity unit. Otherwise (e.g. ,in case of on-demand
* table RCU. Otherwise (e.g., in case of on-demand
* billing mode), it returns
* [[DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND]].
*/
def tableReadThroughput(description: TableDescription): Long =
tableThroughput(
description,
DynamoDBConstants.BYTES_PER_READ_CAPACITY_UNIT.longValue(),
_.readCapacityUnits)
tableThroughput(description, _.readCapacityUnits)

/**
* @return The write throughput (in bytes per second) of the
* provided table description.
* @return The write throughput (in WCU) of the provided table description.
* If the table billing mode is PROVISIONED, it returns the
* table WCU multiplied by the number of bytes per write
* capacity unit. Otherwise (e.g. ,in case of on-demand
* table WCU. Otherwise (e.g., in case of on-demand
* billing mode), it returns
* [[DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND]].
*/
def tableWriteThroughput(description: TableDescription): Long =
tableThroughput(
description,
DynamoDBConstants.BYTES_PER_WRITE_CAPACITY_UNIT.longValue(),
_.writeCapacityUnits)
tableThroughput(description, _.writeCapacityUnits)

private def tableThroughput(description: TableDescription,
bytesPerCapacityUnit: Long,
capacityUnits: ProvisionedThroughputDescription => Long): Long =
if (description.billingModeSummary == null || description.billingModeSummary.billingMode == BillingMode.PROVISIONED) {
capacityUnits(description.provisionedThroughput) * bytesPerCapacityUnit
capacityUnits(description.provisionedThroughput)
} else {
DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND * bytesPerCapacityUnit
DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND
}

/** Reflection-friendly credentials provider used by the EMR DynamoDB connector */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ object AlternatorValidator {
targetSettings.finalCredentials,
targetSettings.region,
targetSettings.table,
targetSettings.scanSegments,
targetSettings.maxMapTasks,
readThroughput = None,
throughputReadPercent = None
sourceSettings.scanSegments, // Reuse same settings as source table
sourceSettings.maxMapTasks,
sourceSettings.readThroughput,
sourceSettings.throughputReadPercent
)

// Define some aliases to prevent the Spark engine to try to serialize the whole object graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ object TargetSettings {
region: Option[String],
credentials: Option[AWSCredentials],
table: String,
scanSegments: Option[Int],
writeThroughput: Option[Int],
throughputWritePercent: Option[Float],
maxMapTasks: Option[Int],
streamChanges: Boolean,
skipInitialSnapshotTransfer: Option[Boolean])
extends TargetSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.dynamodb.model.{ DescribeTableRequest, TableDescription }

object DynamoDB {

val log = LogManager.getLogger("com.scylladb.migrator.readers.DynamoDB")

def readRDD(
spark: SparkSession,
source: SourceSettings.DynamoDB): (RDD[(Text, DynamoDBItemWritable)], TableDescription) =
Expand Down Expand Up @@ -106,11 +109,18 @@ object DynamoDB {
jobConf,
DynamoDBConstants.TABLE_SIZE_BYTES,
Option(description.tableSizeBytes).map(_.toString))
jobConf.set(
DynamoDBConstants.READ_THROUGHPUT,
readThroughput
.getOrElse(DynamoUtils.tableReadThroughput(description))
.toString)
val throughput = readThroughput match {
case Some(value) =>
log.info(
s"Setting up Hadoop job to read the table using a configured throughput of ${value} RCU(s)")
value
case None =>
val value = DynamoUtils.tableReadThroughput(description)
log.info(
s"Setting up Hadoop job to read the table using a computed throughput of ${value} RCU(s)")
value
}
jobConf.set(DynamoDBConstants.READ_THROUGHPUT, throughput.toString)
setOptionalConf(
jobConf,
DynamoDBConstants.THROUGHPUT_READ_PERCENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.scylladb.migrator.config.TargetSettings
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.{ Level, LogManager }
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.dynamodb.model.{ AttributeValue, TableDescription }
Expand All @@ -15,6 +15,8 @@ import java.util

object DynamoDB {

val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoDB")

def writeRDD(target: TargetSettings.DynamoDB,
renamesMap: Map[String, String],
rdd: RDD[(Text, DynamoDBItemWritable)],
Expand All @@ -26,12 +28,22 @@ object DynamoDB {
jobConf,
target.region,
target.endpoint,
target.scanSegments,
target.maxMapTasks,
maybeScanSegments = None,
maybeMaxMapTasks = None,
target.finalCredentials)
jobConf.set(DynamoDBConstants.OUTPUT_TABLE_NAME, target.table)
val writeThroughput =
target.writeThroughput.getOrElse(DynamoUtils.tableWriteThroughput(targetTableDesc))
target.writeThroughput match {
case Some(value) =>
log.info(
s"Setting up Hadoop job to write table using a configured throughput of ${value} WCU(s)")
value
case None =>
val value = DynamoUtils.tableWriteThroughput(targetTableDesc)
log.info(
s"Setting up Hadoop job to write table using a calculated throughput of ${value} WCU(s)")
value
}
jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT, writeThroughput.toString)
setOptionalConf(
jobConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ class DynamoDBInputFormatTest extends munit.FunSuite {
}

test("no configured scanSegments in on-demand billing mode and table size is 100 GB") {
checkPartitions(1024)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None)
// segments are limited by the default read throughput
checkPartitions(200)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None)
}

test("no configured scanSegments in on-demand billing mode, table size is 100 GB, and read throughput is 1,000,000") {
checkPartitions(1024)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None, configuredReadThroughput = Some(1000000))
}

test("no configured scanSegments in provisioned billing mode") {
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = Some((25, 25)))
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = Some((10000, 10000)))
}

test("scanSegments = 42") {
Expand Down

0 comments on commit a05419b

Please sign in to comment.