diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 290ce60f92e..ca63442e074 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -661,6 +661,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def estimatedPartitionSizeForEstimationUpdateInterval: Long = get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL) def masterResourceConsumptionInterval: Long = get(MASTER_RESOURCE_CONSUMPTION_INTERVAL) + def masterUserDiskUsageThreshold: Long = get(MASTER_USER_DISK_USAGE_THRESHOLD) + def masterClusterDiskUsageThreshold: Long = get(MASTER_CLUSTER_DISK_USAGE_THRESHOLD) def clusterName: String = get(CLUSTER_NAME) // ////////////////////////////////////////////////////// @@ -1061,6 +1063,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def registerShuffleFilterExcludedWorkerEnabled: Boolean = get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED) + def interruptShuffleEnabled: Boolean = get(QUOTA_INTERRUPT_SHUFFLE_ENABLED) + // ////////////////////////////////////////////////////// // Worker // // ////////////////////////////////////////////////////// @@ -2841,6 +2845,26 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("30s") + val MASTER_USER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.master.userResourceConsumption.user.threshold") + .categories("master") + .doc("When user resource consumption exceeds quota, Master will " + + "interrupt some apps until user resource consumption is less " + + "than this value. Default value is Long.MaxValue which means disable check.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + + val MASTER_CLUSTER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.master.userResourceConsumption.cluster.threshold") + .categories("master") + .doc("When cluster resource consumption exceeds quota, Master will " + + "interrupt some apps until cluster resource consumption is less " + + "than this value. Default value is Long.MaxValue which means disable check.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + val CLUSTER_NAME: ConfigEntry[String] = buildConf("celeborn.cluster.name") .categories("master", "worker") @@ -5185,7 +5209,7 @@ object CelebornConf extends Logging { .dynamic .doc("Quota dynamic configuration for written disk bytes.") .version("0.5.0") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] = @@ -5203,7 +5227,7 @@ object CelebornConf extends Logging { .dynamic .doc("Quota dynamic configuration for written hdfs bytes.") .version("0.5.0") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] = @@ -5765,4 +5789,51 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] = + buildConf("celeborn.quota.cluster.diskBytesWritten") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for cluster written disk bytes.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + + val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] = + buildConf("celeborn.quota.cluster.diskFileCount") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for cluster written disk file count.") + .version("0.6.0") + .longConf + .createWithDefault(Long.MaxValue) + + val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] = + buildConf("celeborn.quota.cluster.hdfsBytesWritten") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for cluster written hdfs bytes.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + + val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] = + buildConf("celeborn.quota.cluster.hdfsFileCount") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for cluster written hdfs file count.") + .version("0.6.0") + .longConf + .createWithDefault(Long.MaxValue) + + val QUOTA_INTERRUPT_SHUFFLE_ENABLED: ConfigEntry[Boolean] = { + buildConf("celeborn.quota.interruptShuffle.enabled") + .categories("quota") + .dynamic + .doc("If enabled, the resource consumption used by the tenant exceeds " + + "celeborn.quota.tenant.xx, or the resource consumption of the entire cluster " + + "exceeds celeborn.quota.cluster.xx, some shuffles will be selected and interrupted.") + .version("0.6.0") + .booleanConf + .createWithDefault(false) + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala b/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala index 10d1114b985..d454bb894c2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala +++ b/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala @@ -30,6 +30,12 @@ case class ResourceConsumption( hdfsFileCount: Long, var subResourceConsumptions: util.Map[String, ResourceConsumption] = null) { + def withSubResourceConsumptions( + resourceConsumptions: util.Map[String, ResourceConsumption]): ResourceConsumption = { + subResourceConsumptions = resourceConsumptions + this + } + def add(other: ResourceConsumption): ResourceConsumption = { ResourceConsumption( diskBytesWritten + other.diskBytesWritten, @@ -38,6 +44,14 @@ case class ResourceConsumption( hdfsFileCount + other.hdfsFileCount) } + def subtract(other: ResourceConsumption): ResourceConsumption = { + ResourceConsumption( + diskBytesWritten - other.diskBytesWritten, + diskFileCount - other.diskFileCount, + hdfsBytesWritten - other.hdfsBytesWritten, + hdfsFileCount - other.hdfsFileCount) + } + def addSubResourceConsumptions(otherSubResourceConsumptions: Map[ String, ResourceConsumption]): Map[String, ResourceConsumption] = { @@ -77,4 +91,11 @@ case class ResourceConsumption( s" hdfsFileCount: $hdfsFileCount," + s" subResourceConsumptions: $subResourceConsumptionString)" } + + def simpleString: String = { + s"ResourceConsumption(diskBytesWritten: ${Utils.bytesToString(diskBytesWritten)}," + + s" diskFileCount: $diskFileCount," + + s" hdfsBytesWritten: ${Utils.bytesToString(hdfsBytesWritten)}," + + s" hdfsFileCount: $hdfsFileCount)" + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala b/common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala similarity index 90% rename from common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala rename to common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala index 8a845225821..1a7a8e52abf 100644 --- a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala +++ b/common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala @@ -20,7 +20,7 @@ package org.apache.celeborn.common.quota import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.util.Utils -case class Quota( +case class StorageQuota( diskBytesWritten: Long, diskFileCount: Long, hdfsBytesWritten: Long, @@ -34,3 +34,7 @@ case class Quota( s"]" } } + +object StorageQuota { + val DEFAULT_QUOTA = StorageQuota(Long.MaxValue, Long.MaxValue, Long.MaxValue, Long.MaxValue) +} diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 1631e90d34c..cc48cd0ab5c 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -22,10 +22,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -504,4 +501,10 @@ public boolean isWorkerAvailable(WorkerInfo workerInfo) { public void updateApplicationMeta(ApplicationMeta applicationMeta) { applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta); } + + public List workerSnapshot() { + synchronized (workers) { + return new ArrayList<>(workers); + } + } } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 521c32b416e..ee14d81aed5 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -184,7 +184,12 @@ private[celeborn] class Master( private val hasHDFSStorage = conf.hasHDFSStorage private val hasS3Storage = conf.hasS3Storage - private val quotaManager = new QuotaManager(conf, configService) + private val quotaManager = new QuotaManager( + statusSystem, + masterSource, + resourceConsumptionSource, + conf, + configService) private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval private val userResourceConsumptions = JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]() @@ -1098,7 +1103,7 @@ private[celeborn] class Master( needCheckedWorkerList, new util.ArrayList[WorkerInfo]( (statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava), - CheckQuotaResponse(isAvailable = true, ""))) + quotaManager.checkApplicationQuotaStatus(appId))) } else { context.reply(OneWayMessageResponse) } @@ -1114,76 +1119,11 @@ private[celeborn] class Master( } } - private def handleResourceConsumption(userIdentifier: UserIdentifier): ResourceConsumption = { - val userResourceConsumption = computeUserResourceConsumption(userIdentifier) - gaugeResourceConsumption(userIdentifier) - userResourceConsumption - } - - private def gaugeResourceConsumption( - userIdentifier: UserIdentifier, - applicationId: String = null): Unit = { - val resourceConsumptionLabel = - if (applicationId == null) userIdentifier.toMap - else userIdentifier.toMap + (resourceConsumptionSource.applicationLabel -> applicationId) - resourceConsumptionSource.addGauge( - ResourceConsumptionSource.DISK_FILE_COUNT, - resourceConsumptionLabel) { () => - computeResourceConsumption(userIdentifier, applicationId).diskFileCount - } - resourceConsumptionSource.addGauge( - ResourceConsumptionSource.DISK_BYTES_WRITTEN, - resourceConsumptionLabel) { () => - computeResourceConsumption(userIdentifier, applicationId).diskBytesWritten - } - resourceConsumptionSource.addGauge( - ResourceConsumptionSource.HDFS_FILE_COUNT, - resourceConsumptionLabel) { () => - computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount - } - resourceConsumptionSource.addGauge( - ResourceConsumptionSource.HDFS_BYTES_WRITTEN, - resourceConsumptionLabel) { () => - computeResourceConsumption(userIdentifier, applicationId).hdfsBytesWritten - } - } - - private def computeResourceConsumption( - userIdentifier: UserIdentifier, - applicationId: String = null): ResourceConsumption = { - val newResourceConsumption = computeUserResourceConsumption(userIdentifier) - if (applicationId == null) { - val current = System.currentTimeMillis() - if (userResourceConsumptions.containsKey(userIdentifier)) { - val resourceConsumptionAndUpdateTime = userResourceConsumptions.get(userIdentifier) - if (current - resourceConsumptionAndUpdateTime._2 <= masterResourceConsumptionInterval) { - return resourceConsumptionAndUpdateTime._1 - } - } - userResourceConsumptions.put(userIdentifier, (newResourceConsumption, current)) - newResourceConsumption - } else { - newResourceConsumption.subResourceConsumptions.get(applicationId) - } - } - - // TODO: Support calculate topN app resource consumption. - private def computeUserResourceConsumption( - userIdentifier: UserIdentifier): ResourceConsumption = { - val resourceConsumption = statusSystem.workers.asScala.flatMap { - workerInfo => workerInfo.userResourceConsumption.asScala.get(userIdentifier) - }.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _) - resourceConsumption - } - private[master] def handleCheckQuota( userIdentifier: UserIdentifier, context: RpcCallContext): Unit = { - val userResourceConsumption = handleResourceConsumption(userIdentifier) if (conf.quotaEnabled) { - val (isAvailable, reason) = - quotaManager.checkQuotaSpaceAvailable(userIdentifier, userResourceConsumption) - context.reply(CheckQuotaResponse(isAvailable, reason)) + context.reply(quotaManager.checkUserQuotaStatus(userIdentifier)) } else { context.reply(CheckQuotaResponse(true, "")) } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala index b2e72524486..6cc97c136f2 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala @@ -60,4 +60,6 @@ object MasterSource { // Capacity val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes" val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes" + + val UPDATE_RESOURCE_CONSUMPTION_TIME = "UpdateResourceConsumptionTime" } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala index a5d446368a6..9a9b49a4a11 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala @@ -16,96 +16,259 @@ */ package org.apache.celeborn.service.deploy.master.quota +import java.util.{Map => JMap} +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.quota.{Quota, ResourceConsumption} -import org.apache.celeborn.common.util.Utils +import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource +import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource._ +import org.apache.celeborn.common.protocol.message.ControlMessages.CheckQuotaResponse +import org.apache.celeborn.common.quota.{ResourceConsumption, StorageQuota} +import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils} import org.apache.celeborn.server.common.service.config.ConfigService +import org.apache.celeborn.service.deploy.master.MasterSource +import org.apache.celeborn.service.deploy.master.MasterSource.UPDATE_RESOURCE_CONSUMPTION_TIME +import org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager +import org.apache.celeborn.service.deploy.master.quota.QuotaStatus._ -class QuotaManager(celebornConf: CelebornConf, configService: ConfigService) extends Logging { - val DEFAULT_QUOTA = Quota( - celebornConf.get(CelebornConf.QUOTA_DISK_BYTES_WRITTEN), - celebornConf.get(CelebornConf.QUOTA_DISK_FILE_COUNT), - celebornConf.get(CelebornConf.QUOTA_HDFS_BYTES_WRITTEN), - celebornConf.get(CelebornConf.QUOTA_HDFS_FILE_COUNT)) - def getQuota(userIdentifier: UserIdentifier): Quota = { - if (configService != null) { - val config = - configService.getTenantUserConfigFromCache(userIdentifier.tenantId, userIdentifier.name) - config.getQuota - } else { - DEFAULT_QUOTA - } +class QuotaManager( + statusSystem: AbstractMetaManager, + masterSource: MasterSource, + resourceConsumptionSource: ResourceConsumptionSource, + celebornConf: CelebornConf, + configService: ConfigService) extends Logging { + + val userQuotaStatus: JMap[UserIdentifier, QuotaStatus] = JavaUtils.newConcurrentHashMap() + val appQuotaStatus: JMap[String, QuotaStatus] = JavaUtils.newConcurrentHashMap() + val userResourceConsumptionMap: JMap[UserIdentifier, ResourceConsumption] = + JavaUtils.newConcurrentHashMap() + private val quotaChecker = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-quota-checker") + quotaChecker.scheduleWithFixedDelay( + () => { + try { + updateResourceConsumption() + } catch { + case t: Throwable => logError("Update user resource consumption failed.", t) + } + }, + 0L, + celebornConf.masterResourceConsumptionInterval, + TimeUnit.MILLISECONDS) + + def handleAppLost(appId: String): Unit = { + appQuotaStatus.remove(appId) + } + + def checkUserQuotaStatus(userIdentifier: UserIdentifier): CheckQuotaResponse = { + val userStatus = userQuotaStatus.getOrDefault(userIdentifier, QuotaStatus()) + CheckQuotaResponse(!userStatus.exceed, userStatus.exceedReason) + } + + def checkApplicationQuotaStatus(applicationId: String): CheckQuotaResponse = { + val status = appQuotaStatus.getOrDefault(applicationId, QuotaStatus()) + CheckQuotaResponse(!status.exceed, status.exceedReason) + } + + def getUserStorageQuota(user: UserIdentifier): StorageQuota = { + Option(configService) + .map(_.getTenantUserConfigFromCache(user.tenantId, user.name).getTenantStorageQuota) + .getOrElse(StorageQuota.DEFAULT_QUOTA) + } + + def getClusterStorageQuota: StorageQuota = { + Option(configService) + .map(_.getSystemConfigFromCache.getClusterStorageQuota) + .getOrElse(StorageQuota.DEFAULT_QUOTA) } - def checkQuotaSpaceAvailable( - userIdentifier: UserIdentifier, - resourceResumption: ResourceConsumption): (Boolean, String) = { - val quota = getQuota(userIdentifier) + private def interruptShuffleEnabled: Boolean = { + Option(configService) + .map(_.getSystemConfigFromCache.interruptShuffleEnabled()) + .getOrElse(celebornConf.interruptShuffleEnabled) + } + + private def checkUserQuotaSpace( + user: UserIdentifier, + consumption: ResourceConsumption): QuotaStatus = { + val quota = getUserStorageQuota(user) + checkQuotaSpace(s"$USER_EXHAUSTED user: $user. ", consumption, quota) + } + + private def checkClusterQuotaSpace( + consumption: ResourceConsumption, + quota: StorageQuota): QuotaStatus = { + checkQuotaSpace(CLUSTER_EXHAUSTED, consumption, quota) + } + private def checkQuotaSpace( + reason: String, + consumption: ResourceConsumption, + quota: StorageQuota): QuotaStatus = { val checkResults = Seq( - checkDiskBytesWritten(userIdentifier, resourceResumption.diskBytesWritten, quota), - checkDiskFileCount(userIdentifier, resourceResumption.diskFileCount, quota), - checkHdfsBytesWritten(userIdentifier, resourceResumption.hdfsBytesWritten, quota), - checkHdfsFileCount(userIdentifier, resourceResumption.hdfsFileCount, quota)) + checkQuota( + consumption.diskBytesWritten, + quota.diskBytesWritten, + "DISK_BYTES_WRITTEN", + Utils.bytesToString), + checkQuota( + consumption.diskFileCount, + quota.diskFileCount, + "DISK_FILE_COUNT", + _.toString), + checkQuota( + consumption.hdfsBytesWritten, + quota.hdfsBytesWritten, + "HDFS_BYTES_WRITTEN", + Utils.bytesToString), + checkQuota( + consumption.hdfsFileCount, + quota.hdfsFileCount, + "HDFS_FILE_COUNT", + _.toString)) val exceed = checkResults.foldLeft(false)(_ || _._1) - val reason = checkResults.foldLeft("")(_ + _._2) - (!exceed, reason) + val exceedReason = + if (exceed) { + s"$reason ${checkResults.foldLeft("")(_ + _._2)}" + } else { + "" + } + QuotaStatus(exceed, exceedReason) } - private def checkDiskBytesWritten( - userIdentifier: UserIdentifier, + private def checkQuota( value: Long, - quota: Quota): (Boolean, String) = { - val exceed = (quota.diskBytesWritten > 0 && value >= quota.diskBytesWritten) + quota: Long, + quotaType: String, + format: Long => String): (Boolean, String) = { + val exceed = quota > 0 && value >= quota var reason = "" if (exceed) { - reason = s"User $userIdentifier used diskBytesWritten (${Utils.bytesToString(value)}) " + - s"exceeds quota (${Utils.bytesToString(quota.diskBytesWritten)}). " + reason = s"$quotaType(${format(value)}) exceeds quota(${format(quota)}). " logWarning(reason) } (exceed, reason) } - private def checkDiskFileCount( - userIdentifier: UserIdentifier, - value: Long, - quota: Quota): (Boolean, String) = { - val exceed = (quota.diskFileCount > 0 && value >= quota.diskFileCount) - var reason = "" - if (exceed) { - reason = - s"User $userIdentifier used diskFileCount($value) exceeds quota(${quota.diskFileCount}). " - logWarning(reason) + private def checkConsumptionExceeded( + used: ResourceConsumption, + threshold: StorageQuota): Boolean = { + used.diskBytesWritten >= threshold.diskBytesWritten || + used.diskFileCount >= threshold.diskFileCount || + used.hdfsBytesWritten >= threshold.hdfsBytesWritten || + used.hdfsFileCount >= threshold.hdfsFileCount + } + + def updateResourceConsumption(): Unit = { + masterSource.sample(UPDATE_RESOURCE_CONSUMPTION_TIME, this.getClass.getSimpleName, Map.empty) { + val clusterQuota = getClusterStorageQuota + var clusterResourceConsumption = ResourceConsumption(0, 0, 0, 0) + val userResourceConsumption = statusSystem.workerSnapshot.asScala.flatMap { workerInfo => + workerInfo.userResourceConsumption.asScala + }.groupBy(_._1).map { case (userIdentifier, userConsumptionList) => + // Step 1: Compute user consumption and set quota status. + val resourceConsumptionList = userConsumptionList.map(_._2) + val resourceConsumption = computeUserResourceConsumption(resourceConsumptionList) + userQuotaStatus.put( + userIdentifier, + checkUserQuotaSpace(userIdentifier, resourceConsumption)) + + // Step 2: Update user resource consumption metrics. + // For extract metrics + userResourceConsumptionMap.put(userIdentifier, resourceConsumption) + registerUserResourceConsumptionMetrics(userIdentifier) + + // Step 3: Expire user level exceeded app except already expired app + clusterResourceConsumption = clusterResourceConsumption.add(resourceConsumption) + val userQuota = getUserStorageQuota(userIdentifier) + if (interruptShuffleEnabled && checkConsumptionExceeded(resourceConsumption, userQuota)) { + val subResourceConsumptions = computeSubAppConsumption(resourceConsumptionList) + // Compute expired size + val (expired, notExpired) = subResourceConsumptions.partition { case (app, _) => + appQuotaStatus.containsKey(app) + } + val userConsumptions = expired.values.foldLeft(resourceConsumption)(_.subtract(_)) + expireApplication(userConsumptions, userQuota, notExpired.toSeq, USER_EXHAUSTED) + (Option(subResourceConsumptions), resourceConsumptionList) + } else { + (None, resourceConsumptionList) + } + } + + // Step 4: Expire cluster level exceeded app except already expired app + if (interruptShuffleEnabled && + checkClusterQuotaSpace(clusterResourceConsumption, clusterQuota).exceed) { + val appConsumptions = userResourceConsumption.map { + case (None, subConsumptionList) => computeSubAppConsumption(subConsumptionList) + case (Some(subConsumptions), _) => subConsumptions + }.flatMap(_.toSeq).toSeq + + // Compute nonExpired app total usage + val (expired, notExpired) = appConsumptions.partition { case (app, _) => + appQuotaStatus.containsKey(app) + } + clusterResourceConsumption = + expired.map(_._2).foldLeft(clusterResourceConsumption)(_.subtract(_)) + expireApplication(clusterResourceConsumption, clusterQuota, notExpired, CLUSTER_EXHAUSTED) + } } - (exceed, reason) } - private def checkHdfsBytesWritten( - userIdentifier: UserIdentifier, - value: Long, - quota: Quota): (Boolean, String) = { - val exceed = (quota.hdfsBytesWritten > 0 && value >= quota.hdfsBytesWritten) - var reason = "" - if (exceed) { - reason = s"User $userIdentifier used hdfsBytesWritten(${Utils.bytesToString(value)}) " + - s"exceeds quota(${Utils.bytesToString(quota.hdfsBytesWritten)}). " - logWarning(reason) + private def expireApplication( + used: ResourceConsumption, + threshold: StorageQuota, + appMap: Seq[(String, ResourceConsumption)], + expireReason: String): Unit = { + var nonExpired = used + if (checkConsumptionExceeded(used, threshold)) { + val sortedConsumption = + appMap.sortBy(_._2)(Ordering.by((r: ResourceConsumption) => + ( + r.diskBytesWritten, + r.diskFileCount, + r.hdfsBytesWritten, + r.hdfsFileCount)).reverse) + for ((appId, consumption) <- sortedConsumption + if checkConsumptionExceeded(nonExpired, threshold)) { + val reason = s"$expireReason Used: ${consumption.simpleString}, Threshold: $threshold" + appQuotaStatus.put(appId, QuotaStatus(exceed = true, reason)) + nonExpired = nonExpired.subtract(consumption) + } } - (exceed, reason) } - private def checkHdfsFileCount( - userIdentifier: UserIdentifier, - value: Long, - quota: Quota): (Boolean, String) = { - val exceed = (quota.hdfsFileCount > 0 && value >= quota.hdfsFileCount) - var reason = "" - if (exceed) { - reason = - s"User $userIdentifier used hdfsFileCount($value) exceeds quota(${quota.hdfsFileCount}). " - logWarning(reason) + private def computeUserResourceConsumption( + consumptions: Seq[ResourceConsumption]): ResourceConsumption = { + consumptions.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _) + } + + private def computeSubAppConsumption( + resourceConsumptionList: Seq[ResourceConsumption]): Map[String, ResourceConsumption] = { + resourceConsumptionList.foldRight(Map.empty[String, ResourceConsumption]) { + case (consumption, subConsumption) => + consumption.addSubResourceConsumptions(subConsumption) + } + } + + private def getResourceConsumption(userIdentifier: UserIdentifier): ResourceConsumption = { + userResourceConsumptionMap.getOrDefault(userIdentifier, ResourceConsumption(0, 0, 0, 0)) + } + + private def registerUserResourceConsumptionMetrics(userIdentifier: UserIdentifier): Unit = { + resourceConsumptionSource.addGauge(DISK_FILE_COUNT, userIdentifier.toMap) { () => + getResourceConsumption(userIdentifier).diskBytesWritten + } + resourceConsumptionSource.addGauge(DISK_BYTES_WRITTEN, userIdentifier.toMap) { () => + getResourceConsumption(userIdentifier).diskBytesWritten + } + resourceConsumptionSource.addGauge(HDFS_FILE_COUNT, userIdentifier.toMap) { () => + getResourceConsumption(userIdentifier).hdfsFileCount + } + resourceConsumptionSource.addGauge(HDFS_BYTES_WRITTEN, userIdentifier.toMap) { () => + getResourceConsumption(userIdentifier).hdfsBytesWritten } - (exceed, reason) } } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala new file mode 100644 index 00000000000..561ae7fe793 --- /dev/null +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master.quota + +import QuotaStatus._ + +case class QuotaStatus(exceed: Boolean = false, exceedReason: String = NORMAL) + +object QuotaStatus { + val NORMAL: String = "" + val CLUSTER_EXHAUSTED: String = + "Interrupt application caused by the cluster storage usage reach threshold." + val USER_EXHAUSTED: String = + "Interrupt or reject application caused by the user storage usage reach threshold." +} diff --git a/master/src/test/resources/dynamicConfig-quota-2.yaml b/master/src/test/resources/dynamicConfig-quota-2.yaml new file mode 100644 index 00000000000..fd5e27aaa82 --- /dev/null +++ b/master/src/test/resources/dynamicConfig-quota-2.yaml @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.quota.tenant.diskBytesWritten: 1000G + celeborn.quota.tenant.diskFileCount: 100 + celeborn.quota.tenant.hdfsBytesWritten: 1G + celeborn.quota.cluster.diskBytesWritten: 130G + celeborn.quota.interruptShuffle.enabled: true + +- tenantId: tenant_01 + level: TENANT + config: + celeborn.quota.tenant.diskBytesWritten: 10G + celeborn.quota.tenant.diskFileCount: 1000 + celeborn.quota.tenant.hdfsBytesWritten: 10G + users: + - name: Jerry + config: + celeborn.quota.tenant.diskBytesWritten: 100G + celeborn.quota.tenant.diskFileCount: 10000 + celeborn.master.userResourceConsumption.user.threshold: 120G + + diff --git a/master/src/test/resources/dynamicConfig-quota.yaml b/master/src/test/resources/dynamicConfig-quota.yaml index 156a3f692b4..8a7d33d03f9 100644 --- a/master/src/test/resources/dynamicConfig-quota.yaml +++ b/master/src/test/resources/dynamicConfig-quota.yaml @@ -19,6 +19,7 @@ celeborn.quota.tenant.diskBytesWritten: 1G celeborn.quota.tenant.diskFileCount: 100 celeborn.quota.tenant.hdfsBytesWritten: 1G + celeborn.quota.interruptShuffle.enabled: true - tenantId: tenant_01 level: TENANT diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala index 5e5e93017ba..bd29815f338 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala @@ -19,16 +19,26 @@ package org.apache.celeborn.service.deploy.master.quota import java.io.File -import org.junit.Assert.assertEquals +import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} +import scala.util.Random + +import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.quota.{Quota, ResourceConsumption} +import org.apache.celeborn.common.meta.WorkerInfo +import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource +import org.apache.celeborn.common.protocol.TransportModuleConstants +import org.apache.celeborn.common.protocol.message.ControlMessages.CheckQuotaResponse +import org.apache.celeborn.common.quota.{ResourceConsumption, StorageQuota} +import org.apache.celeborn.common.rpc.RpcEnv import org.apache.celeborn.common.util.Utils -import org.apache.celeborn.server.common.service.config.DynamicConfigServiceFactory +import org.apache.celeborn.server.common.service.config.{ConfigService, DynamicConfigServiceFactory, FsConfigServiceImpl} +import org.apache.celeborn.service.deploy.master.MasterSource +import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager class QuotaManagerSuite extends AnyFunSuite with BeforeAndAfterAll @@ -36,35 +46,77 @@ class QuotaManagerSuite extends AnyFunSuite with Logging { protected var quotaManager: QuotaManager = _ + private var resourceConsumptionSource: ResourceConsumptionSource = _ + + val worker = new WorkerInfo( + "localhost", + 10001, + 10002, + 10003, + 10004) + + val conf = new CelebornConf() + + var configService: ConfigService = _ + // helper function final protected def getTestResourceFile(file: String): File = { new File(getClass.getClassLoader.getResource(file).getFile) } override def beforeAll(): Unit = { - val conf = new CelebornConf() conf.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS") conf.set( CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key, getTestResourceFile("dynamicConfig-quota.yaml").getPath) - quotaManager = new QuotaManager(conf, DynamicConfigServiceFactory.getConfigService(conf)) + val statusSystem = new SingleMasterMetaManager( + RpcEnv.create( + "test-rpc", + TransportModuleConstants.RPC_SERVICE_MODULE, + "localhost", + 9001, + conf, + None), + conf) + statusSystem.workers.add(worker) + resourceConsumptionSource = new ResourceConsumptionSource(conf, "Master") + configService = DynamicConfigServiceFactory.getConfigService(conf) + quotaManager = new QuotaManager( + statusSystem, + new MasterSource(conf), + resourceConsumptionSource, + conf, + configService) } test("test celeborn quota conf") { + configService.refreshCache() assertEquals( - quotaManager.getQuota(UserIdentifier("tenant_01", "Jerry")), - Quota(Utils.byteStringAsBytes("100G"), 10000, Utils.byteStringAsBytes("10G"), Long.MaxValue)) + quotaManager.getUserStorageQuota(UserIdentifier("tenant_01", "Jerry")), + StorageQuota( + Utils.byteStringAsBytes("100G"), + 10000, + Utils.byteStringAsBytes("10G"), + Long.MaxValue)) // Fallback to tenant level assertEquals( - quotaManager.getQuota(UserIdentifier("tenant_01", "name_not_exist")), - Quota(Utils.byteStringAsBytes("10G"), 1000, Utils.byteStringAsBytes("10G"), Long.MaxValue)) + quotaManager.getUserStorageQuota(UserIdentifier("tenant_01", "name_not_exist")), + StorageQuota( + Utils.byteStringAsBytes("10G"), + 1000, + Utils.byteStringAsBytes("10G"), + Long.MaxValue)) // Fallback to system level assertEquals( - quotaManager.getQuota(UserIdentifier("tenant_not_exist", "Tom")), - Quota(Utils.byteStringAsBytes("1G"), 100, Utils.byteStringAsBytes("1G"), Long.MaxValue)) + quotaManager.getUserStorageQuota(UserIdentifier("tenant_not_exist", "Tom")), + StorageQuota( + Utils.byteStringAsBytes("1G"), + 100, + Utils.byteStringAsBytes("1G"), + Long.MaxValue)) } - test("test check quota return result") { + test("test check user quota return result") { val user = UserIdentifier("tenant_01", "Jerry") val rc1 = ResourceConsumption(Utils.byteStringAsBytes("10G"), 20, Utils.byteStringAsBytes("1G"), 40) @@ -77,22 +129,431 @@ class QuotaManagerSuite extends AnyFunSuite Utils.byteStringAsBytes("30G"), 40) - val res1 = quotaManager.checkQuotaSpaceAvailable(user, rc1) - val res2 = quotaManager.checkQuotaSpaceAvailable(user, rc2) - val res3 = quotaManager.checkQuotaSpaceAvailable(user, rc3) + addUserConsumption(user, rc1) + quotaManager.updateResourceConsumption() + val res1 = checkUserQuota(user) - val exp1 = (true, "") - val exp2 = ( + addUserConsumption(user, rc2) + quotaManager.updateResourceConsumption() + val res2 = checkUserQuota(user) + + addUserConsumption(user, rc3) + quotaManager.updateResourceConsumption() + val res3 = checkUserQuota(user) + + val exp1 = CheckQuotaResponse(true, "") + val exp2 = CheckQuotaResponse( false, - s"User $user used hdfsBytesWritten(30.0 GiB) exceeds quota(10.0 GiB). ") - val exp3 = ( + s"Interrupt or reject application caused by the user storage usage reach threshold. " + + s"user: `tenant_01`.`Jerry`. " + + s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ") + val exp3 = CheckQuotaResponse( false, - s"User $user used diskBytesWritten (200.0 GiB) exceeds quota (100.0 GiB). " + - s"User $user used diskFileCount(20000) exceeds quota(10000). " + - s"User $user used hdfsBytesWritten(30.0 GiB) exceeds quota(10.0 GiB). ") + s"Interrupt or reject application caused by the user storage usage reach threshold. " + + s"user: `tenant_01`.`Jerry`. " + + s"DISK_BYTES_WRITTEN(200.0 GiB) exceeds quota(100.0 GiB). " + + s"DISK_FILE_COUNT(20000) exceeds quota(10000). " + + s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ") assert(res1 == exp1) assert(res2 == exp2) assert(res3 == exp3) + clearUserConsumption() + } + + test("test check application quota return result") { + val user = UserIdentifier("tenant_01", "Jerry") + var rc = + ResourceConsumption( + Utils.byteStringAsBytes("200G"), + 20000, + Utils.byteStringAsBytes("30G"), + 40) + rc.withSubResourceConsumptions( + Map( + "app1" -> ResourceConsumption( + Utils.byteStringAsBytes("150G"), + 15000, + Utils.byteStringAsBytes("25G"), + 20), + "app2" -> ResourceConsumption( + Utils.byteStringAsBytes("50G"), + 5000, + Utils.byteStringAsBytes("5G"), + 20)).asJava) + + addUserConsumption(user, rc) + conf.set("celeborn.quota.cluster.diskBytesWritten", "60gb") + configService.refreshCache() + quotaManager.updateResourceConsumption() + var res1 = checkUserQuota(user) + var res2 = checkApplicationQuota(user, "app1") + var res3 = checkApplicationQuota(user, "app2") + + val succeed = CheckQuotaResponse(true, "") + val failed = CheckQuotaResponse( + false, + s"Interrupt or reject application caused by the user storage usage reach threshold. " + + s"user: `tenant_01`.`Jerry`. " + + s"DISK_BYTES_WRITTEN(200.0 GiB) exceeds quota(100.0 GiB). " + + s"DISK_FILE_COUNT(20000) exceeds quota(10000). " + + s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ") + assert(res1 == failed) + assert(res2 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: " + + "ResourceConsumption(" + + "diskBytesWritten: 150.0 GiB, " + + "diskFileCount: 15000, " + + "hdfsBytesWritten: 25.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: " + + "Quota[" + + "diskBytesWritten=100.0 GiB, " + + "diskFileCount=10000, " + + "hdfsBytesWritten=10.0 GiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res3 == succeed) + + conf.set("celeborn.quota.cluster.diskBytesWritten", "50gb") + configService.refreshCache() + quotaManager.updateResourceConsumption() + res1 = checkUserQuota(user) + res2 = checkApplicationQuota(user, "app1") + res3 = checkApplicationQuota(user, "app2") + + assert(res1 == failed) + assert(res2 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: ResourceConsumption(" + + "diskBytesWritten: 150.0 GiB, " + + "diskFileCount: 15000, " + + "hdfsBytesWritten: 25.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: Quota[" + + "diskBytesWritten=100.0 GiB, " + + "diskFileCount=10000, " + + "hdfsBytesWritten=10.0 GiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res3 == CheckQuotaResponse( + false, + "Interrupt application caused by the cluster storage usage reach threshold. " + + "Used: ResourceConsumption(" + + "diskBytesWritten: 50.0 GiB, " + + "diskFileCount: 5000, " + + "hdfsBytesWritten: 5.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: " + + "Quota[" + + "diskBytesWritten=50.0 GiB, " + + "diskFileCount=9223372036854775807, " + + "hdfsBytesWritten=8.0 EiB, " + + "hdfsFileCount=9223372036854775807]")) + clearUserConsumption() + + rc = + ResourceConsumption( + Utils.byteStringAsBytes("50G"), + 1000, + Utils.byteStringAsBytes("5G"), + 40) + rc.withSubResourceConsumptions( + Map( + "app1" -> ResourceConsumption( + Utils.byteStringAsBytes("40G"), + 500, + Utils.byteStringAsBytes("3G"), + 20), + "app2" -> ResourceConsumption( + Utils.byteStringAsBytes("10G"), + 500, + Utils.byteStringAsBytes("2G"), + 20)).asJava) + + addUserConsumption(user, rc) + conf.set("celeborn.quota.cluster.diskBytesWritten", "20gb") + configService.refreshCache() + quotaManager.updateResourceConsumption() + + res1 = checkUserQuota(user) + res2 = checkApplicationQuota(user, "app1") + res3 = checkApplicationQuota(user, "app2") + + assert(res1 == succeed) + assert(res2 == CheckQuotaResponse( + false, + "Interrupt application caused by the cluster storage usage reach threshold. " + + "Used: " + + "ResourceConsumption(" + + "diskBytesWritten: 40.0 GiB, " + + "diskFileCount: 500, " + + "hdfsBytesWritten: 3.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: " + + "Quota[diskBytesWritten=20.0 GiB, " + + "diskFileCount=9223372036854775807, " + + "hdfsBytesWritten=8.0 EiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res3 == CheckQuotaResponse(true, "")) + + clearUserConsumption() + } + + test("test handleResourceConsumption time - case1") { + // 1000 users 100wapplications, all exceeded + conf.set("celeborn.quota.tenant.diskBytesWritten", "1mb") + conf.set("celeborn.quota.cluster.diskBytesWritten", "1mb") + configService.refreshCache() + val MAX = 2L * 1024 * 1024 * 1024 + val MIN = 1L * 1024 * 1024 * 1024 + val random = new Random() + for (i <- 0 until 1000) { + val user = UserIdentifier("default", s"user$i") + val subResourceConsumption = (0 until 1000).map { + index => + val appId = s"$user$i app$index" + val consumption = ResourceConsumption( + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN)) + (appId, consumption) + }.toMap + val userConsumption = subResourceConsumption.values.foldRight( + ResourceConsumption(0, 0, 0, 0))(_ add _) + userConsumption.subResourceConsumptions = subResourceConsumption.asJava + addUserConsumption(user, userConsumption) + } + + val start = System.currentTimeMillis() + quotaManager.updateResourceConsumption() + val duration = System.currentTimeMillis() - start + print(s"duration=$duration") + + val res = resourceConsumptionSource.getMetrics() + for (i <- 0 until 1000) { + val user = UserIdentifier("default", s"user$i") + assert(res.contains( + s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_hdfsFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_hdfsBytesWritten_Value{name="user$i",role="Master",tenantId="default"}""")) + assertFalse(quotaManager.checkUserQuotaStatus(user).isAvailable) + (0 until 1000).foreach { + index => + val appId = s"$user$i app$index" + assertFalse(quotaManager.checkApplicationQuotaStatus(appId).isAvailable) + } + } + clearUserConsumption() + } + + test("test handleResourceConsumption time - case2") { + // 1000 users 2000000 applications, all exceeded + conf.set("celeborn.quota.tenant.diskBytesWritten", "1mb") + conf.set("celeborn.quota.cluster.diskBytesWritten", "1mb") + configService.refreshCache() + val MAX = 2L * 1024 * 1024 * 1024 + val MIN = 1L * 1024 * 1024 * 1024 + val random = new Random() + for (i <- 0 until 1000) { + val user = UserIdentifier("default", s"user$i") + val subResourceConsumption = + if (i < 100) { + (0 until 1000).map { + index => + val appId = s"$user$i app$index" + val consumption = ResourceConsumption( + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN)) + (appId, consumption) + }.toMap + } else { + (0 until 1000).map { + index => + val appId = s"$user$i app$index" + val consumption = ResourceConsumption(0, 0, 0, 0) + (appId, consumption) + }.toMap + } + val userConsumption = subResourceConsumption.values.foldRight( + ResourceConsumption(0, 0, 0, 0))(_ add _) + userConsumption.subResourceConsumptions = subResourceConsumption.asJava + addUserConsumption(user, userConsumption) + } + + val start = System.currentTimeMillis() + quotaManager.updateResourceConsumption() + val duration = System.currentTimeMillis() - start + print(s"duration=$duration") + + val res = resourceConsumptionSource.getMetrics() + for (i <- 0 until 1000) { + val user = UserIdentifier("default", s"user$i") + assert(res.contains( + s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_hdfsFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_hdfsBytesWritten_Value{name="user$i",role="Master",tenantId="default"}""")) + if (i < 100) { + assertFalse(quotaManager.checkUserQuotaStatus(user).isAvailable) + } else { + assertTrue(quotaManager.checkUserQuotaStatus(user).isAvailable) + } + (0 until 1000).foreach { + index => + val appId = s"$user$i app$index" + if (i < 100) { + assertFalse(quotaManager.checkApplicationQuotaStatus(appId).isAvailable) + } else { + assertTrue(quotaManager.checkApplicationQuotaStatus(appId).isAvailable) + } + } + } + clearUserConsumption() + } + + test("test user level conf") { + val conf1 = new CelebornConf() + conf1.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS") + conf1.set( + CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key, + getTestResourceFile("dynamicConfig-quota-2.yaml").getPath) + val statusSystem = new SingleMasterMetaManager( + RpcEnv.create( + "test-rpc", + TransportModuleConstants.RPC_SERVICE_MODULE, + "localhost", + 9001, + conf1, + None), + conf1) + statusSystem.workers.add(worker) + val quotaManager1 = new QuotaManager( + statusSystem, + new MasterSource(conf1), + resourceConsumptionSource, + conf1, + new FsConfigServiceImpl(conf1)) + + val user = UserIdentifier("tenant_01", "Jerry") + val user1 = UserIdentifier("tenant_01", "John") + + val rc = + ResourceConsumption( + Utils.byteStringAsBytes("200G"), + 20000, + Utils.byteStringAsBytes("30G"), + 40) + rc.withSubResourceConsumptions( + Map( + "app1" -> ResourceConsumption( + Utils.byteStringAsBytes("150G"), + 15000, + Utils.byteStringAsBytes("25G"), + 20), + "app2" -> ResourceConsumption( + Utils.byteStringAsBytes("50G"), + 5000, + Utils.byteStringAsBytes("5G"), + 20)).asJava) + + val rc1 = + ResourceConsumption( + Utils.byteStringAsBytes("80G"), + 0, + 0, + 0) + + rc1.withSubResourceConsumptions( + Map( + "app3" -> ResourceConsumption( + Utils.byteStringAsBytes("80G"), + 0, + 0, + 0)).asJava) + + addUserConsumption(user, rc) + addUserConsumption(user1, rc1) + + quotaManager1.updateResourceConsumption() + val res1 = quotaManager1.checkUserQuotaStatus(user) + val res2 = quotaManager1.checkApplicationQuotaStatus("app1") + val res3 = quotaManager1.checkApplicationQuotaStatus("app2") + val res4 = quotaManager1.checkApplicationQuotaStatus("app3") + assert(res1 == CheckQuotaResponse( + false, + s"Interrupt or reject application caused by the user storage usage reach threshold. " + + s"user: `tenant_01`.`Jerry`. " + + s"DISK_BYTES_WRITTEN(200.0 GiB) exceeds quota(100.0 GiB). " + + s"DISK_FILE_COUNT(20000) exceeds quota(10000). " + + s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ")) + assert(res2 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: ResourceConsumption(" + + "diskBytesWritten: 150.0 GiB, " + + "diskFileCount: 15000, " + + "hdfsBytesWritten: 25.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: " + + "Quota[" + + "diskBytesWritten=100.0 GiB, " + + "diskFileCount=10000, " + + "hdfsBytesWritten=10.0 GiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res3 == CheckQuotaResponse(true, "")) + assert(res4 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: " + + "ResourceConsumption(" + + "diskBytesWritten: 80.0 GiB, " + + "diskFileCount: 0, " + + "hdfsBytesWritten: 0.0 B, " + + "hdfsFileCount: 0), " + + "Threshold: " + + "Quota[" + + "diskBytesWritten=10.0 GiB, " + + "diskFileCount=1000, " + + "hdfsBytesWritten=10.0 GiB, " + + "hdfsFileCount=9223372036854775807]")) + + clearUserConsumption() + } + + def checkUserQuota(userIdentifier: UserIdentifier): CheckQuotaResponse = { + quotaManager.checkUserQuotaStatus(userIdentifier) + } + + def checkApplicationQuota( + userIdentifier: UserIdentifier, + applicationId: String): CheckQuotaResponse = { + quotaManager.checkApplicationQuotaStatus(applicationId) + } + + def addUserConsumption( + userIdentifier: UserIdentifier, + resourceConsumption: ResourceConsumption): Unit = { + worker.userResourceConsumption.put(userIdentifier, resourceConsumption) + } + + def clearUserConsumption(): Unit = { + val applicationSet = worker.userResourceConsumption.asScala.values.flatMap { consumption => + Option(consumption.subResourceConsumptions).map(_.asScala.keySet) + }.flatten.toSet + + applicationSet.foreach(quotaManager.handleAppLost) + worker.userResourceConsumption.clear() } } diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java index c48bbe1fb56..88d31fc9f82 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java @@ -25,7 +25,7 @@ import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.internal.config.ConfigEntry; -import org.apache.celeborn.common.quota.Quota; +import org.apache.celeborn.common.quota.StorageQuota; import org.apache.celeborn.common.util.Utils; /** @@ -40,7 +40,9 @@ public abstract class DynamicConfig { private static final Logger LOG = LoggerFactory.getLogger(DynamicConfig.class); protected Map configs = new HashMap<>(); - protected volatile Quota quota = null; + protected volatile StorageQuota tenantStorageQuota = null; + + protected volatile StorageQuota clusterStorageQuota = null; public abstract DynamicConfig getParentLevelConfig(); @@ -92,19 +94,19 @@ public T formatValue( return null; } - public Quota getQuota() { - if (quota == null) { + public StorageQuota getTenantStorageQuota() { + if (tenantStorageQuota == null) { synchronized (DynamicConfig.class) { - if (quota == null) { - quota = currentQuota(); + if (tenantStorageQuota == null) { + tenantStorageQuota = currentTenantQuota(); } } } - return quota; + return tenantStorageQuota; } - protected Quota currentQuota() { - return new Quota( + protected StorageQuota currentTenantQuota() { + return new StorageQuota( getValue( CelebornConf.QUOTA_DISK_BYTES_WRITTEN().key(), CelebornConf.QUOTA_DISK_BYTES_WRITTEN(), @@ -127,6 +129,49 @@ protected Quota currentQuota() { ConfigType.STRING)); } + public StorageQuota getClusterStorageQuota() { + if (clusterStorageQuota == null) { + synchronized (DynamicConfig.class) { + if (clusterStorageQuota == null) { + clusterStorageQuota = currentClusterQuota(); + } + } + } + return clusterStorageQuota; + } + + protected StorageQuota currentClusterQuota() { + return new StorageQuota( + getValue( + CelebornConf.QUOTA_CLUSTER_DISK_BYTES_WRITTEN().key(), + CelebornConf.QUOTA_CLUSTER_DISK_BYTES_WRITTEN(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.QUOTA_CLUSTER_DISK_FILE_COUNT().key(), + CelebornConf.QUOTA_CLUSTER_DISK_FILE_COUNT(), + Long.TYPE, + ConfigType.STRING), + getValue( + CelebornConf.QUOTA_CLUSTER_HDFS_BYTES_WRITTEN().key(), + CelebornConf.QUOTA_CLUSTER_HDFS_BYTES_WRITTEN(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.QUOTA_CLUSTER_HDFS_FILE_COUNT().key(), + CelebornConf.QUOTA_CLUSTER_HDFS_FILE_COUNT(), + Long.TYPE, + ConfigType.STRING)); + } + + public boolean interruptShuffleEnabled() { + return getValue( + CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED().key(), + CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED(), + Boolean.TYPE, + ConfigType.BOOLEAN); + } + public Map getConfigs() { return configs; } @@ -143,6 +188,7 @@ public enum ConfigType { BYTES, STRING, TIME_MS, + BOOLEAN } public static T convert(Class clazz, String value) { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index d0371e2bd4b..7f1d7cd0ecf 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -671,8 +671,14 @@ private[celeborn] class Worker( val resourceConsumptionSnapshot = storageManager.userResourceConsumptionSnapshot() val userResourceConsumptions = workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava) - resourceConsumptionSnapshot.foreach { case (userIdentifier, _) => + resourceConsumptionSnapshot.foreach { case (userIdentifier, userResourceConsumption) => gaugeResourceConsumption(userIdentifier) + val subResourceConsumptions = userResourceConsumption.subResourceConsumptions + if (CollectionUtils.isNotEmpty(subResourceConsumptions)) { + subResourceConsumptions.asScala.keys.foreach { + gaugeResourceConsumption(userIdentifier, _) + } + } } handleTopResourceConsumption(userResourceConsumptions) userResourceConsumptions diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 00e0a28878a..cc02f52b409 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -904,17 +904,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } // userIdentifier -> List((userIdentifier, (applicationId, fileInfo)))) .groupBy(_._1) - .map { case (userIdentifier, userWithFileInfoList) => + .mapValues { userWithFileInfoList => // collect resource consumed by each user on this worker - val userFileInfos = userWithFileInfoList.map(_._2) - ( - userIdentifier, - resourceConsumption( - userFileInfos.map(_._2), - userFileInfos.groupBy(_._1).map { - case (applicationId, appWithFileInfoList) => - (applicationId, resourceConsumption(appWithFileInfoList.map(_._2))) - }.asJava)) + val subResourceConsumption = userWithFileInfoList.map(_._2).groupBy(_._1).map { + case (applicationId, appWithFileInfoList) => + (applicationId, resourceConsumption(appWithFileInfoList.map(_._2))) + } + subResourceConsumption.values.foldLeft(ResourceConsumption(0, 0, 0, 0))(_ add _) + .withSubResourceConsumptions(subResourceConsumption.asJava) } } }