Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getUpdatedPartitions and optimizeUpdatedPartitions helper #85

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/main/scala/mrpowers/jodie/DeltaHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.expressions.Window.partitionBy
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

import java.time.Instant
import scala.collection.mutable

object DeltaHelpers {
Expand Down Expand Up @@ -125,6 +127,44 @@ object DeltaHelpers {
}
}

/**
* Gets the number of updated partitions in a Delta Table for a given time range. This is particularly useful in cases
* where you want to find partitions affected by append/merge operations and want to run a compaction or vacuum operation
* on them
*/
def getUpdatedPartitions(path: String, startTime: Option[Instant] = None, endTime: Option[Instant] = None): Array[Map[String, String]] = {
DeltaLog
.forTable(SparkSession.active, path)
.snapshot
.allFiles
.filter(a => startTime.forall(a.modificationTime >= _.toEpochMilli) && endTime.forall(a.modificationTime <= _.toEpochMilli))
.withColumn("partitionValuesString", col("partitionValues").cast(StringType))
.dropDuplicates("partitionValuesString")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cast to string for dropDuplicates as dropDuplicates directly on partitionValues column which have type Map will result in error

Cannot have map type columns in DataFrame which calls set operations(intersect, except, etc.), but the type of column partitionValues is map<string,string>;

.select("partitionValues")
.collect()
.map(_.getAs[Map[String, String]](0))
}

/**
* Performs DeltaTable optimization on recently updated partitions. This function will automatically determine updated partitions
* within startTime and endTime and run a compaction operation on them. If zOrderCols are provided, it will run a zOrder operation
* */
def optimizeUpdatedPartitions(path: String, startTime: Option[Instant], endTime: Option[Instant], zOrderCols: Option[Seq[String]] = None): DataFrame = {
val updatedPartitionsCondition = getUpdatedPartitions(path, startTime, endTime)
.map(_.map { case (p, v) => s"$p = '$v'" }.mkString("(", " AND ", ")"))
.mkString(" OR ")

val optimizeBuilder = DeltaTable
.forPath(path)
.optimize()
.where(updatedPartitionsCondition)

zOrderCols match {
case Some(cols) => optimizeBuilder.executeZOrderBy(cols: _*)
case None => optimizeBuilder.executeCompaction()
}
}

def getShuffleFileMetadata(path: String, condition: String):
(Seq[AddFile], Seq[AddFile], Seq[AddFile], Seq[AddFile], Seq[AddFile], DataFrame, Seq[String]) = {
val (deltaLog, unresolvedColumns, targetOnlyPredicates, minMaxOnlyExpressions, equalOnlyExpressions, otherExpressions, removedPredicates) = getResolvedExpressions(path, condition)
Expand Down
104 changes: 103 additions & 1 deletion src/test/scala/mrpowers/jodie/DeltaHelperSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import com.github.mrpowers.spark.daria.sql.SparkSessionExt.SparkSessionMethods
import com.github.mrpowers.spark.fast.tests.DataFrameComparer
import io.delta.tables.DeltaTable
import mrpowers.jodie.delta.DeltaConstants._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal}

import java.time.Instant
import scala.collection.mutable
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -472,6 +474,106 @@ class DeltaHelperSpec
}
}

describe("get updated partitions and optimize those partitions") {
it("should get updated partition without time filter") {
val path = (os.pwd / "tmp" / "delta-get-updated-partition-without-time-filter").toString()

createBaseDeltaTableWithPartitions(path, Seq("firstname", "lastname"), partRows)
Seq(
(8, "Benito", "Jackson"),
(9, "Maria", "Willis"),
(10, "Percy", "Jackson")
)
.toDF("id", "firstname", "lastname")
.write
.format("delta")
.mode("append")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)

val actualUpdatedPartitions = DeltaHelpers.getUpdatedPartitions(path)
val expectedUpdatedPartitions = Array(
Map("firstname" -> "Benito", "lastname" -> "Jackson"),
Map("firstname" -> "Gabriela", "lastname" -> "Travolta"),
Map("firstname" -> "Jose", "lastname" -> "Travolta"),
Map("firstname" -> "Maria", "lastname" -> "Pitt"),
Map("firstname" -> "Maria", "lastname" -> "Willis"),
Map("firstname" -> "Patricia", "lastname" -> "Jackson"),
Map("firstname" -> "Percy", "lastname" -> "Jackson")
)

assert(actualUpdatedPartitions.sameElements(expectedUpdatedPartitions))
}

it("should get updated partition with time filter") {
val path = (os.pwd / "tmp" / "delta-get-updated-partition-without-time-filter").toString()

createBaseDeltaTableWithPartitions(path, Seq("firstname", "lastname"), partRows)

val startTime = Instant.now()
Seq(
(8, "Benito", "Jackson"),
(9, "Maria", "Willis"),
(10, "Percy", "Jackson")
)
.toDF("id", "firstname", "lastname")
.write
.format("delta")
.mode("append")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)
val endTime = Instant.now()

Seq(
(3, "Jose", "Travolta"),
(4, "Patricia", "Jackson")
)
.toDF("id", "firstname", "lastname")
.write
.format("delta")
.mode("append")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)

val expectedUpdatedPartitions = Array(
Map("firstname" -> "Benito", "lastname" -> "Jackson"),
Map("firstname" -> "Maria", "lastname" -> "Willis"),
Map("firstname" -> "Percy", "lastname" -> "Jackson")
)


val actualUpdatedPartitions = DeltaHelpers.getUpdatedPartitions(path, Some(startTime), Some(endTime))
assert(actualUpdatedPartitions.sameElements(expectedUpdatedPartitions))
}

it("should optimize updated partitions") {
val path = (os.pwd / "tmp" / "delta-get-updated-partition-without-time-filter").toString()

createBaseDeltaTableWithPartitions(path, Seq("firstname", "lastname"), partRows)


val startTime = Instant.now()
Seq(
(8, "Benito", "Jackson"),
(9, "Maria", "Willis"),
(10, "Percy", "Jackson"),
(11, "Benito", "Jackson"),
(12, "Jose", "Travolta")
)
.toDF("id", "firstname", "lastname")
.write
.format("delta")
.mode("append")
.option("delta.logRetentionDuration", "interval 30 days")
.save(path)
val endTime = Instant.now()

val optimizedDf = DeltaHelpers.optimizeUpdatedPartitions(path, Some(startTime), Some(endTime))
val numOptimizedPartition = optimizedDf.select("metrics.partitionsOptimized").first().getLong(0)
assert(numOptimizedPartition == 3)
}
}

describe("Validate and append data to a delta table"){
it("should append dataframes with optional columns"){
val path = (os.pwd / "tmp" / "delta-lake-validate-append-valid-dataframe").toString()
Expand Down