Skip to content

Commit

Permalink
FallbackStorage retries FileNotFoundExceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Nov 28, 2024
1 parent 32232e9 commit 67cc348
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 9 deletions.
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,25 @@ package object config {
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY =
ConfigBuilder("spark.storage.decommission.fallbackStorage.replicationDelay")
.doc("The maximum expected delay for files written by one executor to become " +
"available to other executors.")
.version("4.0.0")
.timeConf(TimeUnit.SECONDS)
.checkValue(_ > 0, "Value must be positive.")
.createOptional

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT =
ConfigBuilder("spark.storage.decommission.fallbackStorage.replicationWait")
.doc("When an executor cannot find a file in the fallback storage it waits " +
"this amount of time before attempting to open the file again, " +
f"while not exceeding ${STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY.key}.")
.version("4.0.0")
.timeConf(TimeUnit.SECONDS)
.checkValue(_ > 0, "Value must be positive.")
.createWithDefaultString("1s")

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
.doc("If true, Spark cleans up its fallback storage data during shutting down.")
Expand Down
67 changes: 61 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,26 @@

package org.apache.spark.storage

import java.io.DataInputStream
import java.io.{DataInputStream, FileNotFoundException}
import java.nio.ByteBuffer

import scala.annotation.tailrec
import scala.concurrent.Future
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY, STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* A fallback storage used by storage decommissioners.
Expand Down Expand Up @@ -137,6 +138,7 @@ private[spark] object FallbackStorage extends Logging {
val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf)
// The fallback directory for this app may not be created yet.
if (fallbackFileSystem.exists(fallbackPath)) {
logInfo(s"Attempt to clean up: $fallbackUri")
if (fallbackFileSystem.delete(fallbackPath, true)) {
logInfo(s"Succeed to clean up: $fallbackUri")
} else {
Expand All @@ -154,6 +156,59 @@ private[spark] object FallbackStorage extends Logging {
FALLBACK_BLOCK_MANAGER_ID, blockId, StorageLevel.DISK_ONLY, memSize = 0, dataLength)
}

/**
* Open the file, retry a FileNotFoundException for waitMs milliseconds,
* unless this would exceed the deadline. In the latter case, rethrow the exception.
*/
@tailrec
private def open(filesystem: FileSystem,
path: Path,
deadlineMs: Long,
waitMs: Long,
clock: Clock) : FSDataInputStream = {
try {
filesystem.open(path)
} catch {
case fnf: FileNotFoundException =>
val waitTillMs = clock.getTimeMillis() + waitMs
if (waitTillMs <= deadlineMs) {
logInfo(f"File not found, waiting ${waitMs / 1000}s: $path")
clock.waitTillTime(waitTillMs)
open(filesystem, path, deadlineMs, waitMs, clock)
} else {
throw fnf
}
}
}

/**
* Open the file and retry FileNotFoundExceptions according to
* STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY and
* STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT
*/
// Visible for testing
private[spark] def open(conf: SparkConf,
filesystem: FileSystem,
path: Path,
clock: Clock = new SystemClock()): FSDataInputStream = {
val replicationDelay = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY)
if (replicationDelay.isDefined) {
val replicationDeadline = clock.getTimeMillis() + replicationDelay.get * 1000
val replicationWait = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT)
val replicationWaitMs = replicationWait * 1000
try {
open(filesystem, path, replicationDeadline, replicationWaitMs, clock)
} catch {
case fnf: FileNotFoundException =>
logInfo(f"File not found, exceeded expected replication delay " +
f"of ${replicationDelay.get}s: $path")
throw fnf
}
} else {
filesystem.open(path)
}
}

/**
* Read a ManagedBuffer.
*/
Expand All @@ -179,7 +234,7 @@ private[spark] object FallbackStorage extends Logging {
val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val start = startReduceId * 8L
val end = endReduceId * 8L
Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream =>
Utils.tryWithResource(open(conf, fallbackFileSystem, indexFile)) { inputStream =>
Utils.tryWithResource(new DataInputStream(inputStream)) { index =>
index.skip(start)
val offset = index.readLong()
Expand All @@ -192,7 +247,7 @@ private[spark] object FallbackStorage extends Logging {
logDebug(s"To byte array $size")
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
Utils.tryWithResource(open(conf, fallbackFileSystem, dataFile)) { f =>
f.seek(offset)
f.readFully(array)
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
*/
package org.apache.spark.storage

import java.io.{DataOutputStream, File, FileOutputStream, InputStream, IOException}
import java.io.{DataOutputStream, File, FileNotFoundException, FileOutputStream, InputStream, IOException}
import java.nio.file.Files

import scala.concurrent.duration._
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.LocalSparkContext.withSpark
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
import org.apache.spark.network.BlockTransferService
Expand All @@ -38,11 +39,12 @@ import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.util.Clock
import org.apache.spark.util.Utils.tryWithResource

class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {

def getSparkConf(initialExecutor: Int = 1, minExecutor: Int = 1): SparkConf = {
def getSparkConf(initialExecutor: Int = 1, minExecutor: Int = 1): SparkConf = {
new SparkConf(false)
.setAppName(getClass.getName)
.set(SPARK_MASTER, s"local-cluster[$initialExecutor,1,1024]")
Expand Down Expand Up @@ -333,7 +335,43 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}
}

Seq(0, 1000, 3000, 6000).foreach { replicationMs =>
test(s"Consider replication delay - ${replicationMs}ms") {
val delay = 5000 // max allowed replication
val wait = 2000 // time between open file attempts
val conf = getSparkConf()
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY.key, s"${delay}ms")
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT.key, s"${wait}ms")

val filesystem = FileSystem.get(SparkHadoopUtil.get.newConfiguration(conf))
val path = new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, "file")
val startMs = 123000000L * 1000L // arbitrary system time
val clock = new DelayedActionClock(replicationMs, startMs)(filesystem.create(path).close())

if (replicationMs <= delay) {
// expect open to succeed
val in = FallbackStorage.open(conf, filesystem, path, clock)
assert(in != null)

// how many waits are expected to observe replication
val expectedWaits = Math.ceil(replicationMs.toFloat / wait).toInt
assert(clock.timeMs == startMs + expectedWaits * wait)
assert(clock.waited == expectedWaits)
in.close()
} else {
// expect open to fail
assertThrows[FileNotFoundException](FallbackStorage.open(conf, filesystem, path, clock))

// how many waits are expected to observe delay
val expectedWaits = delay / wait
assert(clock.timeMs == startMs + expectedWaits * wait)
assert(clock.waited == expectedWaits)
}
}
}
}

class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream
with Seekable with PositionedReadable {
override def read: Int = in.read
Expand Down Expand Up @@ -377,3 +415,31 @@ class ReadPartialFileSystem extends LocalFileSystem {
new FSDataInputStream(new ReadPartialInputStream(stream))
}
}

class DelayedActionClock(delayMs: Long, startTimeMs: Long)(action: => Unit)
extends Clock {
var timeMs: Long = startTimeMs
var waited: Int = 0
var triggered: Boolean = false

if (delayMs == 0) trigger()

private def trigger(): Unit = {
if (!triggered) {
triggered = true
action
}
}

override def getTimeMillis(): Long = timeMs
override def nanoTime(): Long = timeMs * 1000000
override def waitTillTime(targetTime: Long): Long = {
waited += 1
if (targetTime >= startTimeMs + delayMs) {
timeMs = startTimeMs + delayMs
trigger()
}
timeMs = targetTime
targetTime
}
}

0 comments on commit 67cc348

Please sign in to comment.