Skip to content

Commit

Permalink
Lazy clean up dangling index metadata log entry (#558)
Browse files Browse the repository at this point in the history
* Enhance withTx API with UT

Signed-off-by: Chen Dai <[email protected]>

* Refactor UT

Signed-off-by: Chen Dai <[email protected]>

* Support cleanup for create index

Signed-off-by: Chen Dai <[email protected]>

* Add IT

Signed-off-by: Chen Dai <[email protected]>

* Fix broken UT

Signed-off-by: Chen Dai <[email protected]>

* Skip corrupt check for creating and vacuuming index

Signed-off-by: Chen Dai <[email protected]>

* Add more IT

Signed-off-by: Chen Dai <[email protected]>

* Refactor nested conditional statements by match case

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Oct 15, 2024
1 parent 0b106f1 commit 8461ff9
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava)

/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())
override protected val flintClient: FlintClient =
FlintClientBuilder.build(flintSparkConf.flintOptions())

private val flintIndexMetadataService: FlintIndexMetadataService = {
FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions())
Expand Down Expand Up @@ -170,7 +171,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
}
})
.commit(_ => indexRefresh.start(spark, flintSparkConf))
}
}.flatten

/**
* Describe all Flint indexes whose name matches the given pattern.
Expand Down Expand Up @@ -242,7 +243,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
case (true, false) => updateIndexManualToAuto(index, tx)
case (false, false) => updateIndexAutoToManual(index, tx)
}
}
}.flatten
}

/**
Expand Down Expand Up @@ -276,7 +277,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
logInfo("Flint index to be deleted doesn't exist")
false
}
}
}.getOrElse(false)

/**
* Delete a Flint index physically.
Expand Down Expand Up @@ -319,7 +320,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
logInfo("Flint index to vacuum doesn't exist")
false
}
}
}.getOrElse(false)

/**
* Recover index job.
Expand Down Expand Up @@ -356,24 +357,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
true
})
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
tx
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => { false })
} else {
false
}
logInfo("Index to be recovered is not auto refreshed")
false
}
}
}.getOrElse(false)

/**
* Build data frame for querying the given index. This is mostly for unit test convenience.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,22 @@ class FlintSparkIndexMonitor(
override def run(): Unit = {
logInfo(s"Scheduler trigger index monitor task for $indexName")
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintMetadataLogService.recordHeartbeat(indexName)
val isJobActive = isStreamingJobActive(indexName)
val indexExists = flintClient.exists(indexName)

if (!flintClient.exists(indexName)) {
logWarning("Streaming job is active but data is deleted")
(isJobActive, indexExists) match {
case (true, true) =>
logInfo("Streaming job is active and index exists")
flintMetadataLogService.recordHeartbeat(indexName)

case (true, false) =>
logWarning("Streaming job is active but index is deleted")
stopStreamingJobAndMonitor(indexName)
}
} else {
logError("Streaming job is not active. Cancelling monitor task")
stopMonitor(indexName)
logInfo("Index monitor task is cancelled")

case (false, _) =>
logError("Streaming job is not active. Cancelling monitor task")
stopMonitor(indexName)
logInfo("Index monitor task is cancelled")
}
errorCnt = 0 // Reset counter if no error
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,33 @@
package org.opensearch.flint.spark

import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{CREATING, EMPTY, VACUUMING}
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.FlintClient

import org.apache.spark.internal.Logging

/**
* Provides transaction support with proper error handling and logging capabilities.
*
* @note
* This trait requires the mixing class to extend Spark's `Logging` to utilize its logging
* functionalities. Meanwhile it needs to provide `FlintClient` and data source name so this
* trait can help create transaction context.
* This trait requires the mixing class to provide both `FlintClient` and
* `FlintMetadataLogService` so this trait can help create transaction context.
*/
trait FlintSparkTransactionSupport { self: Logging =>
trait FlintSparkTransactionSupport extends Logging {

/** Flint client defined in the mixing class */
protected def flintClient: FlintClient

/** Flint metadata log service defined in the mixing class */
protected def flintMetadataLogService: FlintMetadataLogService

/**
* Executes a block of code within a transaction context, handling and logging errors
* appropriately. This method logs the start and completion of the transaction and captures any
* exceptions that occur, enriching them with detailed error messages before re-throwing.
* exceptions that occur, enriching them with detailed error messages before re-throwing. If the
* index data is missing (excluding index creation actions), the operation is bypassed, and any
* dangling metadata log entries are cleaned up.
*
* @param indexName
* the name of the index on which the operation is performed
Expand All @@ -39,19 +46,31 @@ trait FlintSparkTransactionSupport { self: Logging =>
* @tparam T
* the type of the result produced by the operation block
* @return
* the result of the operation block
* Some(result) of the operation block if the operation is executed, or None if the operation
* execution is bypassed due to index corrupted
*/
def withTransaction[T](indexName: String, opName: String, forceInit: Boolean = false)(
opBlock: OptimisticTransaction[T] => T): T = {
opBlock: OptimisticTransaction[T] => T): Option[T] = {
logInfo(s"Starting index operation [$opName $indexName] with forceInit=$forceInit")
try {
// Create transaction (only have side effect if forceInit is true)
val tx: OptimisticTransaction[T] =
flintMetadataLogService.startTransaction(indexName, forceInit)
val isCorrupted = isIndexCorrupted(indexName)
if (isCorrupted) {
cleanupCorruptedIndex(indexName)
}

// Execute the action if create index action (indicated by forceInit) or not corrupted
if (forceInit || !isCorrupted) {

val result = opBlock(tx)
logInfo(s"Index operation [$opName $indexName] complete")
result
// Create transaction (only have side effect if forceInit is true)
val tx: OptimisticTransaction[T] =
flintMetadataLogService.startTransaction(indexName, forceInit)
val result = opBlock(tx)
logInfo(s"Index operation [$opName $indexName] complete")
Some(result)
} else {
logWarning(s"Bypassing index operation [$opName $indexName]")
None
}
} catch {
case e: Exception =>
logError(s"Failed to execute index operation [$opName $indexName]", e)
Expand All @@ -60,4 +79,42 @@ trait FlintSparkTransactionSupport { self: Logging =>
throw e
}
}

/**
* Determines if the index is corrupted, meaning metadata log entry exists but the corresponding
* data index does not. For indexes creating or vacuuming, the check for a corrupted index is
* skipped to reduce the possibility of race condition. This is because the index may be in a
* transitional phase where the data index is temporarily missing before the process completes.
*/
private def isIndexCorrupted(indexName: String): Boolean = {
val logEntry =
flintMetadataLogService
.getIndexMetadataLog(indexName)
.flatMap(_.getLatest)
val logEntryExists = logEntry.isPresent
val dataIndexExists = flintClient.exists(indexName)
val isCreatingOrVacuuming =
logEntry
.filter(e => e.state == EMPTY || e.state == CREATING || e.state == VACUUMING)
.isPresent
val isCorrupted = logEntryExists && !dataIndexExists && !isCreatingOrVacuuming

if (isCorrupted) {
logWarning(s"""
| Cleaning up corrupted index:
| - logEntryExists [$logEntryExists]
| - dataIndexExists [$dataIndexExists]
| - isCreatingOrVacuuming [$isCreatingOrVacuuming]
|""".stripMargin)
}
isCorrupted
}

private def cleanupCorruptedIndex(indexName: String): Unit = {
flintMetadataLogService
.startTransaction(indexName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
}
}
Loading

0 comments on commit 8461ff9

Please sign in to comment.