Skip to content

Commit

Permalink
add support to set default TTL as relative to existing writetimestamp…
Browse files Browse the repository at this point in the history
… and now (in UTC)
  • Loading branch information
tarzanek committed Jan 27, 2022
1 parent b2caaeb commit ab5ca1a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 6 deletions.
4 changes: 4 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ source:
# Preserve TTLs and WRITETIMEs of cells in the source database. Note that this
# option is *incompatible* when copying tables with collections (lists, maps, sets).
preserveTimestamps: true
# optional, default TTL in seconds to use if source.preserveTimestamps AND IF original TTL is null, final TTL to be set, if 0 or unset it will be ignored
# final write TTL set will be relative to writetimestamp, so: defaultTTL - now-writetimestamp
# it assumes writetimestamps are in UTC
#defaultTTL: 0
# Number of splits to use - this should be at minimum the amount of cores
# available in the Spark cluster, and optimally more; higher splits will lead
# to more fine-grained resumes. Aim for 8 * (Spark cores).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object SourceSettings {
connections: Option[Int],
fetchSize: Int,
preserveTimestamps: Boolean,
defaultTTL: Option[Long],
where: Option[String])
extends SourceSettings
case class DynamoDB(endpoint: Option[DynamoDBEndpoint],
Expand Down
27 changes: 21 additions & 6 deletions src/main/scala/com/scylladb/migrator/readers/Cassandra.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.apache.spark.sql.types.{ IntegerType, LongType, StructField, StructTy
import org.apache.spark.sql.{ DataFrame, Row, SparkSession }
import org.apache.spark.unsafe.types.UTF8String

import java.time.Instant
import scala.collection.mutable.ArrayBuffer

object Cassandra {
Expand Down Expand Up @@ -92,7 +93,8 @@ object Cassandra {
def explodeRow(row: Row,
schema: StructType,
primaryKeyOrdinals: Map[String, Int],
regularKeyOrdinals: Map[String, (Int, Int, Int)]) =
regularKeyOrdinals: Map[String, (Int, Int, Int)],
defaultTTL: Long) =
if (regularKeyOrdinals.isEmpty) List(row)
else {
val rowTimestampsToFields =
Expand Down Expand Up @@ -126,7 +128,16 @@ object Cassandra {

timestampsToFields
.map {
case ((ttl, writetime), fields) =>
case ((ttl, writetime), fields) => {
val writetimestamp = writetime.getOrElse(CassandraOption.Unset)
val baseTTL = if (defaultTTL > 0) defaultTTL else 0L
val deltaTTL = if (writetimestamp == CassandraOption.Unset) {
0
} else {
baseTTL - (System.currentTimeMillis - writetimestamp.asInstanceOf[Long] / 1000) / 1000
}
val finalttl =
if (writetimestamp != CassandraOption.Unset && (deltaTTL > 0)) deltaTTL else 0L
val newValues = schema.fields.map { field =>
primaryKeyOrdinals
.get(field.name)
Expand All @@ -135,9 +146,10 @@ object Cassandra {
else Some(row.get(ord))
}
.getOrElse(fields.getOrElse(field.name, CassandraOption.Unset))
} ++ Seq(ttl.getOrElse(0L), writetime.getOrElse(CassandraOption.Unset))
} ++ Seq(ttl.getOrElse(finalttl), writetimestamp)

Row(newValues: _*)
}
}
}

Expand Down Expand Up @@ -168,7 +180,8 @@ object Cassandra {
df: DataFrame,
timestampColumns: Option[TimestampColumns],
origSchema: StructType,
tableDef: TableDef): DataFrame =
tableDef: TableDef,
defaultTTL: Long): DataFrame =
timestampColumns match {
case None => df
case Some(TimestampColumns(ttl, writeTime)) =>
Expand All @@ -193,7 +206,8 @@ object Cassandra {
_,
broadcastSchema.value,
broadcastPrimaryKeyOrdinals.value,
broadcastRegularKeyOrdinals.value)
broadcastRegularKeyOrdinals.value,
defaultTTL)
}(RowEncoder(finalSchema))

}
Expand Down Expand Up @@ -269,7 +283,8 @@ object Cassandra {
spark.createDataFrame(rdd, selection.schema),
selection.timestampColumns,
origSchema,
tableDef
tableDef,
source.defaultTTL.getOrElse(0L)
)

SourceDataFrame(resultingDataframe, selection.timestampColumns, true)
Expand Down

0 comments on commit ab5ca1a

Please sign in to comment.