diff --git a/config.yaml.example b/config.yaml.example index 616b5c83..ca5360df 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -28,6 +28,10 @@ source: # Preserve TTLs and WRITETIMEs of cells in the source database. Note that this # option is *incompatible* when copying tables with collections (lists, maps, sets). preserveTimestamps: true + # optional, default TTL in seconds to use if source.preserveTimestamps AND IF original TTL is null, final TTL to be set, if 0 or unset it will be ignored + # final write TTL set will be relative to writetimestamp, so: defaultTTL - now-writetimestamp + # it assumes writetimestamps are in UTC + #defaultTTL: 0 # Number of splits to use - this should be at minimum the amount of cores # available in the Spark cluster, and optimally more; higher splits will lead # to more fine-grained resumes. Aim for 8 * (Spark cores). diff --git a/src/main/scala/com/scylladb/migrator/config/SourceSettings.scala b/src/main/scala/com/scylladb/migrator/config/SourceSettings.scala index e4dd1229..34a1dc9a 100644 --- a/src/main/scala/com/scylladb/migrator/config/SourceSettings.scala +++ b/src/main/scala/com/scylladb/migrator/config/SourceSettings.scala @@ -27,6 +27,7 @@ object SourceSettings { connections: Option[Int], fetchSize: Int, preserveTimestamps: Boolean, + defaultTTL: Option[Long], where: Option[String]) extends SourceSettings case class DynamoDB(endpoint: Option[DynamoDBEndpoint], diff --git a/src/main/scala/com/scylladb/migrator/readers/Cassandra.scala b/src/main/scala/com/scylladb/migrator/readers/Cassandra.scala index b2dbd11a..1bfcb2fd 100644 --- a/src/main/scala/com/scylladb/migrator/readers/Cassandra.scala +++ b/src/main/scala/com/scylladb/migrator/readers/Cassandra.scala @@ -15,6 +15,7 @@ import org.apache.spark.sql.types.{ IntegerType, LongType, StructField, StructTy import org.apache.spark.sql.{ DataFrame, Row, SparkSession } import org.apache.spark.unsafe.types.UTF8String +import java.time.Instant import scala.collection.mutable.ArrayBuffer object Cassandra { @@ -92,7 +93,8 @@ object Cassandra { def explodeRow(row: Row, schema: StructType, primaryKeyOrdinals: Map[String, Int], - regularKeyOrdinals: Map[String, (Int, Int, Int)]) = + regularKeyOrdinals: Map[String, (Int, Int, Int)], + defaultTTL: Long) = if (regularKeyOrdinals.isEmpty) List(row) else { val rowTimestampsToFields = @@ -126,7 +128,16 @@ object Cassandra { timestampsToFields .map { - case ((ttl, writetime), fields) => + case ((ttl, writetime), fields) => { + val writetimestamp = writetime.getOrElse(CassandraOption.Unset) + val baseTTL = if (defaultTTL > 0) defaultTTL else 0L + val deltaTTL = if (writetimestamp == CassandraOption.Unset) { + 0 + } else { + baseTTL - (System.currentTimeMillis - writetimestamp.asInstanceOf[Long] / 1000) / 1000 + } + val finalttl = + if (writetimestamp != CassandraOption.Unset && (deltaTTL > 0)) deltaTTL else 0L val newValues = schema.fields.map { field => primaryKeyOrdinals .get(field.name) @@ -135,9 +146,10 @@ object Cassandra { else Some(row.get(ord)) } .getOrElse(fields.getOrElse(field.name, CassandraOption.Unset)) - } ++ Seq(ttl.getOrElse(0L), writetime.getOrElse(CassandraOption.Unset)) + } ++ Seq(ttl.getOrElse(finalttl), writetimestamp) Row(newValues: _*) + } } } @@ -168,7 +180,8 @@ object Cassandra { df: DataFrame, timestampColumns: Option[TimestampColumns], origSchema: StructType, - tableDef: TableDef): DataFrame = + tableDef: TableDef, + defaultTTL: Long): DataFrame = timestampColumns match { case None => df case Some(TimestampColumns(ttl, writeTime)) => @@ -193,7 +206,8 @@ object Cassandra { _, broadcastSchema.value, broadcastPrimaryKeyOrdinals.value, - broadcastRegularKeyOrdinals.value) + broadcastRegularKeyOrdinals.value, + defaultTTL) }(RowEncoder(finalSchema)) } @@ -269,7 +283,8 @@ object Cassandra { spark.createDataFrame(rdd, selection.schema), selection.timestampColumns, origSchema, - tableDef + tableDef, + source.defaultTTL.getOrElse(0L) ) SourceDataFrame(resultingDataframe, selection.timestampColumns, true)