From f73ef8434bec25990a903b8dab7de61a16f596df Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Thu, 5 Sep 2024 21:29:29 +1000 Subject: [PATCH 1/2] Add getUpdatedPartitions and optimizeUpdatedPartitions helper --- .../scala/mrpowers/jodie/DeltaHelpers.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala index faed7e3..8c9b445 100644 --- a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala +++ b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala @@ -9,7 +9,9 @@ import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.expressions.Window.partitionBy import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.StringType +import java.time.Instant import scala.collection.mutable object DeltaHelpers { @@ -125,6 +127,44 @@ object DeltaHelpers { } } + /** + * Gets the number of updated partitions in a Delta Table for a given time range. This is particularly useful in cases + * where you want to find partitions affected by append/merge operations and want to run a compaction or vacuum operation + * on them + */ + def getUpdatedPartitions(path: String, startTime: Option[Instant] = None, endTime: Option[Instant] = None): Array[Map[String, String]] = { + DeltaLog + .forTable(SparkSession.active, path) + .snapshot + .allFiles + .filter(a => startTime.forall(a.modificationTime >= _.toEpochMilli) && endTime.forall(a.modificationTime <= _.toEpochMilli)) + .withColumn("partitionValuesString", col("partitionValues").cast(StringType)) + .dropDuplicates("partitionValues") + .select("partitionValues") + .collect() + .map(_.getAs[Map[String, String]](0)) + } + + /** + * Performs DeltaTable optimization on recently updated partitions. This function will automatically determine updated partitions + * within startTime and endTime and run a compaction operation on them. If zOrderCols are provided, it will run a zOrder operation + * */ + def optimizeUpdatedPartitions(path: String, startTime: Option[Instant], endTime: Option[Instant], zOrderCols: Option[Seq[String]] = None): DataFrame = { + val updatedPartitionsCondition = getUpdatedPartitions(path, startTime, endTime) + .map(_.map { case (p, v) => s"$p = '$v'" }.mkString("(", " AND ", ")")) + .mkString(" OR ") + + val optimizeBuilder = DeltaTable + .forPath(path) + .optimize() + .where(updatedPartitionsCondition) + + zOrderCols match { + case Some(cols) => optimizeBuilder.executeZOrderBy(cols: _*) + case None => optimizeBuilder.executeCompaction() + } + } + def getShuffleFileMetadata(path: String, condition: String): (Seq[AddFile], Seq[AddFile], Seq[AddFile], Seq[AddFile], Seq[AddFile], DataFrame, Seq[String]) = { val (deltaLog, unresolvedColumns, targetOnlyPredicates, minMaxOnlyExpressions, equalOnlyExpressions, otherExpressions, removedPredicates) = getResolvedExpressions(path, condition) From ee2a2177a08bc661352cec093894d01fb4ef9461 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Sun, 8 Sep 2024 21:56:03 +1000 Subject: [PATCH 2/2] Add Tests --- .../scala/mrpowers/jodie/DeltaHelpers.scala | 2 +- .../mrpowers/jodie/DeltaHelperSpec.scala | 104 +++++++++++++++++- 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala index 8c9b445..83440b2 100644 --- a/src/main/scala/mrpowers/jodie/DeltaHelpers.scala +++ b/src/main/scala/mrpowers/jodie/DeltaHelpers.scala @@ -139,7 +139,7 @@ object DeltaHelpers { .allFiles .filter(a => startTime.forall(a.modificationTime >= _.toEpochMilli) && endTime.forall(a.modificationTime <= _.toEpochMilli)) .withColumn("partitionValuesString", col("partitionValues").cast(StringType)) - .dropDuplicates("partitionValues") + .dropDuplicates("partitionValuesString") .select("partitionValues") .collect() .map(_.getAs[Map[String, String]](0)) diff --git a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala index aff4ef2..ab3738c 100644 --- a/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala +++ b/src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala @@ -4,12 +4,14 @@ import com.github.mrpowers.spark.daria.sql.SparkSessionExt.SparkSessionMethods import com.github.mrpowers.spark.fast.tests.DataFrameComparer import io.delta.tables.DeltaTable import mrpowers.jodie.delta.DeltaConstants._ -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.types.{IntegerType, StringType} import org.scalatest.BeforeAndAfterEach import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal} +import java.time.Instant import scala.collection.mutable import scala.util.{Failure, Success, Try} @@ -472,6 +474,106 @@ class DeltaHelperSpec } } + describe("get updated partitions and optimize those partitions") { + it("should get updated partition without time filter") { + val path = (os.pwd / "tmp" / "delta-get-updated-partition-without-time-filter").toString() + + createBaseDeltaTableWithPartitions(path, Seq("firstname", "lastname"), partRows) + Seq( + (8, "Benito", "Jackson"), + (9, "Maria", "Willis"), + (10, "Percy", "Jackson") + ) + .toDF("id", "firstname", "lastname") + .write + .format("delta") + .mode("append") + .option("delta.logRetentionDuration", "interval 30 days") + .save(path) + + val actualUpdatedPartitions = DeltaHelpers.getUpdatedPartitions(path) + val expectedUpdatedPartitions = Array( + Map("firstname" -> "Benito", "lastname" -> "Jackson"), + Map("firstname" -> "Gabriela", "lastname" -> "Travolta"), + Map("firstname" -> "Jose", "lastname" -> "Travolta"), + Map("firstname" -> "Maria", "lastname" -> "Pitt"), + Map("firstname" -> "Maria", "lastname" -> "Willis"), + Map("firstname" -> "Patricia", "lastname" -> "Jackson"), + Map("firstname" -> "Percy", "lastname" -> "Jackson") + ) + + assert(actualUpdatedPartitions.sameElements(expectedUpdatedPartitions)) + } + + it("should get updated partition with time filter") { + val path = (os.pwd / "tmp" / "delta-get-updated-partition-without-time-filter").toString() + + createBaseDeltaTableWithPartitions(path, Seq("firstname", "lastname"), partRows) + + val startTime = Instant.now() + Seq( + (8, "Benito", "Jackson"), + (9, "Maria", "Willis"), + (10, "Percy", "Jackson") + ) + .toDF("id", "firstname", "lastname") + .write + .format("delta") + .mode("append") + .option("delta.logRetentionDuration", "interval 30 days") + .save(path) + val endTime = Instant.now() + + Seq( + (3, "Jose", "Travolta"), + (4, "Patricia", "Jackson") + ) + .toDF("id", "firstname", "lastname") + .write + .format("delta") + .mode("append") + .option("delta.logRetentionDuration", "interval 30 days") + .save(path) + + val expectedUpdatedPartitions = Array( + Map("firstname" -> "Benito", "lastname" -> "Jackson"), + Map("firstname" -> "Maria", "lastname" -> "Willis"), + Map("firstname" -> "Percy", "lastname" -> "Jackson") + ) + + + val actualUpdatedPartitions = DeltaHelpers.getUpdatedPartitions(path, Some(startTime), Some(endTime)) + assert(actualUpdatedPartitions.sameElements(expectedUpdatedPartitions)) + } + + it("should optimize updated partitions") { + val path = (os.pwd / "tmp" / "delta-get-updated-partition-without-time-filter").toString() + + createBaseDeltaTableWithPartitions(path, Seq("firstname", "lastname"), partRows) + + + val startTime = Instant.now() + Seq( + (8, "Benito", "Jackson"), + (9, "Maria", "Willis"), + (10, "Percy", "Jackson"), + (11, "Benito", "Jackson"), + (12, "Jose", "Travolta") + ) + .toDF("id", "firstname", "lastname") + .write + .format("delta") + .mode("append") + .option("delta.logRetentionDuration", "interval 30 days") + .save(path) + val endTime = Instant.now() + + val optimizedDf = DeltaHelpers.optimizeUpdatedPartitions(path, Some(startTime), Some(endTime)) + val numOptimizedPartition = optimizedDf.select("metrics.partitionsOptimized").first().getLong(0) + assert(numOptimizedPartition == 3) + } + } + describe("Validate and append data to a delta table"){ it("should append dataframes with optional columns"){ val path = (os.pwd / "tmp" / "delta-lake-validate-append-valid-dataframe").toString()