Skip to content

Commit

Permalink
[CELEBORN-1577][Phase2] QuotaManager should support interrupt shuffle.
Browse files Browse the repository at this point in the history
  • Loading branch information
leixm committed Oct 17, 2024
1 parent d5ca308 commit e70211f
Show file tree
Hide file tree
Showing 14 changed files with 965 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def estimatedPartitionSizeForEstimationUpdateInterval: Long =
get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL)
def masterResourceConsumptionInterval: Long = get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
def masterUserDiskUsageThreshold: Long = get(MASTER_USER_DISK_USAGE_THRESHOLD)
def masterClusterDiskUsageThreshold: Long = get(MASTER_CLUSTER_DISK_USAGE_THRESHOLD)
def clusterName: String = get(CLUSTER_NAME)

// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -1061,6 +1063,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def registerShuffleFilterExcludedWorkerEnabled: Boolean =
get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)

def interruptShuffleEnabled: Boolean = get(QUOTA_INTERRUPT_SHUFFLE_ENABLED)

// //////////////////////////////////////////////////////
// Worker //
// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -2841,6 +2845,26 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

val MASTER_USER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.master.userResourceConsumption.user.threshold")
.categories("master")
.doc("When user resource consumption exceeds quota, Master will " +
"interrupt some apps until user resource consumption is less " +
"than this value. Default value is Long.MaxValue which means disable check.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val MASTER_CLUSTER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.master.userResourceConsumption.cluster.threshold")
.categories("master")
.doc("When cluster resource consumption exceeds quota, Master will " +
"interrupt some apps until cluster resource consumption is less " +
"than this value. Default value is Long.MaxValue which means disable check.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val CLUSTER_NAME: ConfigEntry[String] =
buildConf("celeborn.cluster.name")
.categories("master", "worker")
Expand Down Expand Up @@ -5185,7 +5209,7 @@ object CelebornConf extends Logging {
.dynamic
.doc("Quota dynamic configuration for written disk bytes.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] =
Expand All @@ -5203,7 +5227,7 @@ object CelebornConf extends Logging {
.dynamic
.doc("Quota dynamic configuration for written hdfs bytes.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] =
Expand Down Expand Up @@ -5765,4 +5789,51 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)

val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.diskBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for cluster written disk bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.diskFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for cluster written disk file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for cluster written hdfs bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.hdfsFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for cluster written hdfs file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)

val QUOTA_INTERRUPT_SHUFFLE_ENABLED: ConfigEntry[Boolean] = {
buildConf("celeborn.quota.interruptShuffle.enabled")
.categories("quota")
.dynamic
.doc("If enabled, the resource consumption used by the tenant exceeds " +
"celeborn.quota.tenant.xx, or the resource consumption of the entire cluster " +
"exceeds celeborn.quota.cluster.xx, some shuffles will be selected and interrupted.")
.version("0.6.0")
.booleanConf
.createWithDefault(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ case class ResourceConsumption(
hdfsFileCount: Long,
var subResourceConsumptions: util.Map[String, ResourceConsumption] = null) {

def withSubResourceConsumptions(
resourceConsumptions: util.Map[String, ResourceConsumption]): ResourceConsumption = {
subResourceConsumptions = resourceConsumptions
this
}

def add(other: ResourceConsumption): ResourceConsumption = {
ResourceConsumption(
diskBytesWritten + other.diskBytesWritten,
Expand All @@ -38,6 +44,14 @@ case class ResourceConsumption(
hdfsFileCount + other.hdfsFileCount)
}

def subtract(other: ResourceConsumption): ResourceConsumption = {
ResourceConsumption(
diskBytesWritten - other.diskBytesWritten,
diskFileCount - other.diskFileCount,
hdfsBytesWritten - other.hdfsBytesWritten,
hdfsFileCount - other.hdfsFileCount)
}

def addSubResourceConsumptions(otherSubResourceConsumptions: Map[
String,
ResourceConsumption]): Map[String, ResourceConsumption] = {
Expand Down Expand Up @@ -77,4 +91,11 @@ case class ResourceConsumption(
s" hdfsFileCount: $hdfsFileCount," +
s" subResourceConsumptions: $subResourceConsumptionString)"
}

def simpleString: String = {
s"ResourceConsumption(diskBytesWritten: ${Utils.bytesToString(diskBytesWritten)}," +
s" diskFileCount: $diskFileCount," +
s" hdfsBytesWritten: ${Utils.bytesToString(hdfsBytesWritten)}," +
s" hdfsFileCount: $hdfsFileCount)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.celeborn.common.quota
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.Utils

case class Quota(
case class StorageQuota(
diskBytesWritten: Long,
diskFileCount: Long,
hdfsBytesWritten: Long,
Expand All @@ -34,3 +34,7 @@ case class Quota(
s"]"
}
}

object StorageQuota {
val DEFAULT_QUOTA = StorageQuota(Long.MaxValue, Long.MaxValue, Long.MaxValue, Long.MaxValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -504,4 +501,10 @@ public boolean isWorkerAvailable(WorkerInfo workerInfo) {
public void updateApplicationMeta(ApplicationMeta applicationMeta) {
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
}

public List<WorkerInfo> workerSnapshot() {
synchronized (workers) {
return new ArrayList<>(workers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,12 @@ private[celeborn] class Master(
private val hasHDFSStorage = conf.hasHDFSStorage
private val hasS3Storage = conf.hasS3Storage

private val quotaManager = new QuotaManager(conf, configService)
private val quotaManager = new QuotaManager(
statusSystem,
masterSource,
resourceConsumptionSource,
conf,
configService)
private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval
private val userResourceConsumptions =
JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]()
Expand Down Expand Up @@ -1098,7 +1103,7 @@ private[celeborn] class Master(
needCheckedWorkerList,
new util.ArrayList[WorkerInfo](
(statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava),
CheckQuotaResponse(isAvailable = true, "")))
quotaManager.checkApplicationQuotaStatus(appId)))
} else {
context.reply(OneWayMessageResponse)
}
Expand All @@ -1114,76 +1119,11 @@ private[celeborn] class Master(
}
}

private def handleResourceConsumption(userIdentifier: UserIdentifier): ResourceConsumption = {
val userResourceConsumption = computeUserResourceConsumption(userIdentifier)
gaugeResourceConsumption(userIdentifier)
userResourceConsumption
}

private def gaugeResourceConsumption(
userIdentifier: UserIdentifier,
applicationId: String = null): Unit = {
val resourceConsumptionLabel =
if (applicationId == null) userIdentifier.toMap
else userIdentifier.toMap + (resourceConsumptionSource.applicationLabel -> applicationId)
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_FILE_COUNT,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).diskFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).diskBytesWritten
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_FILE_COUNT,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).hdfsBytesWritten
}
}

private def computeResourceConsumption(
userIdentifier: UserIdentifier,
applicationId: String = null): ResourceConsumption = {
val newResourceConsumption = computeUserResourceConsumption(userIdentifier)
if (applicationId == null) {
val current = System.currentTimeMillis()
if (userResourceConsumptions.containsKey(userIdentifier)) {
val resourceConsumptionAndUpdateTime = userResourceConsumptions.get(userIdentifier)
if (current - resourceConsumptionAndUpdateTime._2 <= masterResourceConsumptionInterval) {
return resourceConsumptionAndUpdateTime._1
}
}
userResourceConsumptions.put(userIdentifier, (newResourceConsumption, current))
newResourceConsumption
} else {
newResourceConsumption.subResourceConsumptions.get(applicationId)
}
}

// TODO: Support calculate topN app resource consumption.
private def computeUserResourceConsumption(
userIdentifier: UserIdentifier): ResourceConsumption = {
val resourceConsumption = statusSystem.workers.asScala.flatMap {
workerInfo => workerInfo.userResourceConsumption.asScala.get(userIdentifier)
}.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _)
resourceConsumption
}

private[master] def handleCheckQuota(
userIdentifier: UserIdentifier,
context: RpcCallContext): Unit = {
val userResourceConsumption = handleResourceConsumption(userIdentifier)
if (conf.quotaEnabled) {
val (isAvailable, reason) =
quotaManager.checkQuotaSpaceAvailable(userIdentifier, userResourceConsumption)
context.reply(CheckQuotaResponse(isAvailable, reason))
context.reply(quotaManager.checkUserQuotaStatus(userIdentifier))
} else {
context.reply(CheckQuotaResponse(true, ""))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,6 @@ object MasterSource {
// Capacity
val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes"
val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes"

val UPDATE_RESOURCE_CONSUMPTION_TIME = "UpdateResourceConsumptionTime"
}
Loading

0 comments on commit e70211f

Please sign in to comment.