From a13e20b10f4ca77dee42ec43791de7b318e3210f Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Fri, 6 Dec 2024 14:31:11 +0800 Subject: [PATCH] fix --- .../apache/celeborn/common/CelebornConf.scala | 66 +++++--- docs/configuration/master.md | 2 - docs/configuration/quota.md | 4 + docs/migration.md | 8 + .../clustermeta/AbstractMetaManager.java | 5 +- .../master/clustermeta/IMetadataHandler.java | 1 - .../clustermeta/SingleMasterMetaManager.java | 12 +- .../clustermeta/ha/HAMasterMetaManager.java | 3 - .../service/deploy/master/Master.scala | 10 +- .../deploy/master/quota/QuotaManager.scala | 129 ++++++++++----- .../deploy/master/quota/QuotaStatus.scala | 2 + .../test/resources/dynamicConfig-quota-2.yaml | 19 +-- .../test/resources/dynamicConfig-quota-3.yaml | 37 +++++ .../test/resources/dynamicConfig-quota.yaml | 16 +- .../master/quota/QuotaManagerSuite.scala | 153 +++++++++++++++--- .../common/service/config/DynamicConfig.java | 40 ++++- 16 files changed, 361 insertions(+), 146 deletions(-) create mode 100644 master/src/test/resources/dynamicConfig-quota-3.yaml diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 8b844bc59f0..f3a30abaa3d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -665,8 +665,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def estimatedPartitionSizeForEstimationUpdateInterval: Long = get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL) def masterResourceConsumptionInterval: Long = get(MASTER_RESOURCE_CONSUMPTION_INTERVAL) - def masterUserDiskUsageThreshold: Long = get(MASTER_USER_DISK_USAGE_THRESHOLD) - def masterClusterDiskUsageThreshold: Long = get(MASTER_CLUSTER_DISK_USAGE_THRESHOLD) def clusterName: String = get(CLUSTER_NAME) // ////////////////////////////////////////////////////// @@ -2930,26 +2928,6 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("30s") - val MASTER_USER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] = - buildConf("celeborn.master.userResourceConsumption.user.threshold") - .categories("master") - .doc("When user resource consumption exceeds quota, Master will " + - "interrupt some apps until user resource consumption is less " + - "than this value. Default value is Long.MaxValue which means disable check.") - .version("0.6.0") - .bytesConf(ByteUnit.BYTE) - .createWithDefault(Long.MaxValue) - - val MASTER_CLUSTER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] = - buildConf("celeborn.master.userResourceConsumption.cluster.threshold") - .categories("master") - .doc("When cluster resource consumption exceeds quota, Master will " + - "interrupt some apps until cluster resource consumption is less " + - "than this value. Default value is Long.MaxValue which means disable check.") - .version("0.6.0") - .bytesConf(ByteUnit.BYTE) - .createWithDefault(Long.MaxValue) - val CLUSTER_NAME: ConfigEntry[String] = buildConf("celeborn.cluster.name") .categories("master", "worker") @@ -5396,7 +5374,7 @@ object CelebornConf extends Logging { .stringConf .createWithDefault(IdentityProvider.DEFAULT_USERNAME) - val QUOTA_DISK_BYTES_WRITTEN: ConfigEntry[Long] = + val QUOTA_TENANT_DISK_BYTES_WRITTEN: ConfigEntry[Long] = buildConf("celeborn.quota.tenant.diskBytesWritten") .categories("quota") .dynamic @@ -5405,7 +5383,7 @@ object CelebornConf extends Logging { .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) - val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] = + val QUOTA_TENANT_DISK_FILE_COUNT: ConfigEntry[Long] = buildConf("celeborn.quota.tenant.diskFileCount") .categories("quota") .dynamic @@ -5414,7 +5392,7 @@ object CelebornConf extends Logging { .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) - val QUOTA_HDFS_BYTES_WRITTEN: ConfigEntry[Long] = + val QUOTA_TENANT_HDFS_BYTES_WRITTEN: ConfigEntry[Long] = buildConf("celeborn.quota.tenant.hdfsBytesWritten") .categories("quota") .dynamic @@ -5423,7 +5401,7 @@ object CelebornConf extends Logging { .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) - val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] = + val QUOTA_TENANT_HDFS_FILE_COUNT: ConfigEntry[Long] = buildConf("celeborn.quota.tenant.hdfsFileCount") .categories("quota") .dynamic @@ -6072,4 +6050,40 @@ object CelebornConf extends Logging { .version("0.6.0") .longConf .createWithDefault(Long.MaxValue) + + val QUOTA_USER_DISK_BYTES_WRITTEN: ConfigEntry[Long] = + buildConf("celeborn.quota.user.diskBytesWritten") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for user written disk bytes.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + + val QUOTA_USER_DISK_FILE_COUNT: ConfigEntry[Long] = + buildConf("celeborn.quota.user.diskFileCount") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for user written disk file count.") + .version("0.6.0") + .longConf + .createWithDefault(Long.MaxValue) + + val QUOTA_USER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] = + buildConf("celeborn.quota.user.hdfsBytesWritten") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for user written hdfs bytes.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + + val QUOTA_USER_HDFS_FILE_COUNT: ConfigEntry[Long] = + buildConf("celeborn.quota.user.hdfsFileCount") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for user written hdfs file count.") + .version("0.6.0") + .longConf + .createWithDefault(Long.MaxValue) } diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 6d5ea329609..5368296a779 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -79,9 +79,7 @@ license: | | celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | false | This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created. | 0.3.0 | celeborn.slots.assign.loadAware.numDiskGroups | | celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. | 0.3.1 | | | celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.availableTypes` | 0.3.0 | celeborn.slots.assign.policy | -| celeborn.master.userResourceConsumption.cluster.threshold | 9223372036854775807b | false | When cluster resource consumption exceeds quota, Master will interrupt some apps until cluster resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | | | celeborn.master.userResourceConsumption.update.interval | 30s | false | Time length for a window about compute user resource consumption. | 0.3.0 | | -| celeborn.master.userResourceConsumption.user.threshold | 9223372036854775807b | false | When user resource consumption exceeds quota, Master will interrupt some apps until user resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | | | celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker unavailable info would be cleared when the retention period is expired. Set -1 to disable the expiration. | 0.3.1 | | | celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | | | celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | Regex to decide which Celeborn configuration properties and environment variables in master and worker environments contain sensitive information. When this regex matches a property key or value, the value is redacted from the logging. | 0.5.0 | | diff --git a/docs/configuration/quota.md b/docs/configuration/quota.md index 2b0c5fb1648..ac1617f2d78 100644 --- a/docs/configuration/quota.md +++ b/docs/configuration/quota.md @@ -32,4 +32,8 @@ license: | | celeborn.quota.tenant.diskFileCount | 9223372036854775807b | true | Quota dynamic configuration for written disk file count. | 0.5.0 | | | celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | | | celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. | 0.5.0 | | +| celeborn.quota.user.diskBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for user written disk bytes. | 0.6.0 | | +| celeborn.quota.user.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for user written disk file count. | 0.6.0 | | +| celeborn.quota.user.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for user written hdfs bytes. | 0.6.0 | | +| celeborn.quota.user.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for user written hdfs file count. | 0.6.0 | | diff --git a/docs/migration.md b/docs/migration.md index b7f36b04183..3ba74e1323f 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -23,6 +23,14 @@ license: | # Upgrading from 0.5 to 0.6 +- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskBytesWritten` to `celeborn.quota.user.diskBytesWritten`. Please use `celeborn.quota.user.diskBytesWritten` if you want to set user level quota. + +- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskFileCount` to `celeborn.quota.user.diskFileCount`. Please use `celeborn.quota.user.diskFileCount` if you want to set user level quota. + +- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsBytesWritten` to `celeborn.quota.user.hdfsBytesWritten`. Please use `celeborn.quota.user.hdfsBytesWritten` if you want to set user level quota. + +- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsFileCount` to `celeborn.quota.user.hdfsFileCount`. Please use `celeborn.quota.user.hdfsFileCount` if you want to set user level quota. + - Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.low.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.low.watermark` instead. - Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.high.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.high.watermark` instead. diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index cf37796bf42..163d212d832 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -250,13 +250,11 @@ public void updateWorkerHeartbeatMeta( int fetchPort, int replicatePort, Map disks, - Map userResourceConsumption, long time, WorkerStatus workerStatus, boolean highWorkload) { WorkerInfo worker = - new WorkerInfo( - host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, userResourceConsumption); + new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, null); AtomicLong availableSlots = new AtomicLong(); LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks); synchronized (workersMap) { @@ -264,7 +262,6 @@ public void updateWorkerHeartbeatMeta( workerInfo.ifPresent( info -> { info.updateThenGetDiskInfos(disks, Option.apply(estimatedPartitionSize)); - info.updateThenGetUserResourceConsumption(userResourceConsumption); availableSlots.set(info.totalAvailableSlots()); info.lastHeartbeat_$eq(time); info.setWorkerStatus(workerStatus); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index c2513563466..52fc783b4ea 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -66,7 +66,6 @@ void handleWorkerHeartbeat( int fetchPort, int replicatePort, Map disks, - Map userResourceConsumption, long time, boolean highWorkload, WorkerStatus workerStatus, diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index 765beb791c1..7a3ce249fdb 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -118,22 +118,12 @@ public void handleWorkerHeartbeat( int fetchPort, int replicatePort, Map disks, - Map userResourceConsumption, long time, boolean highWorkload, WorkerStatus workerStatus, String requestId) { updateWorkerHeartbeatMeta( - host, - rpcPort, - pushPort, - fetchPort, - replicatePort, - disks, - userResourceConsumption, - time, - workerStatus, - highWorkload); + host, rpcPort, pushPort, fetchPort, replicatePort, disks, time, workerStatus, highWorkload); } @Override diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index 1b50b5d7adf..f8a8da925dc 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -272,7 +272,6 @@ public void handleWorkerHeartbeat( int fetchPort, int replicatePort, Map disks, - Map userResourceConsumption, long time, boolean highWorkload, WorkerStatus workerStatus, @@ -290,8 +289,6 @@ public void handleWorkerHeartbeat( .setFetchPort(fetchPort) .setReplicatePort(replicatePort) .putAllDisks(MetaUtil.toPbDiskInfos(disks)) - .putAllUserResourceConsumption( - MetaUtil.toPbUserResourceConsumption(userResourceConsumption)) .setWorkerStatus(MetaUtil.toPbWorkerStatus(workerStatus)) .setTime(time) .setHighWorkload(highWorkload) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 4784f284cac..e74d6a9f9f6 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -186,16 +186,16 @@ private[celeborn] class Master( private val hasHDFSStorage = conf.hasHDFSStorage private val hasS3Storage = conf.hasS3Storage + // workerUniqueId -> ResourceConsumption + private val workerToResourceConsumptions = + JavaUtils.newConcurrentHashMap[String, util.Map[UserIdentifier, ResourceConsumption]]() private val quotaManager = new QuotaManager( - statusSystem, + workerToResourceConsumptions, masterSource, resourceConsumptionSource, conf, configService) private val tagsManager = new TagsManager(Option(configService)) - private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval - private val userResourceConsumptions = - JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]() private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers private val slotsAssignLoadAwareDiskGroupNum = conf.masterSlotAssignLoadAwareDiskGroupNum @@ -670,13 +670,13 @@ private[celeborn] class Master( fetchPort, replicatePort, disks.map { disk => disk.mountPoint -> disk }.toMap.asJava, - userResourceConsumption, System.currentTimeMillis(), highWorkload, workerStatus, requestId) } + workerToResourceConsumptions.put(targetWorker.toUniqueId(), userResourceConsumption) val expiredShuffleKeys = new util.HashSet[String] activeShuffleKeys.asScala.foreach { shuffleKey => val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala index 2011975539f..8a2eccbde98 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala @@ -32,17 +32,19 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils} import org.apache.celeborn.server.common.service.config.ConfigService import org.apache.celeborn.service.deploy.master.MasterSource import org.apache.celeborn.service.deploy.master.MasterSource.UPDATE_RESOURCE_CONSUMPTION_TIME -import org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager import org.apache.celeborn.service.deploy.master.quota.QuotaStatus._ class QuotaManager( - statusSystem: AbstractMetaManager, + workerToResourceConsumptions: JMap[String, JMap[UserIdentifier, ResourceConsumption]], masterSource: MasterSource, resourceConsumptionSource: ResourceConsumptionSource, celebornConf: CelebornConf, configService: ConfigService) extends Logging { val userQuotaStatus: JMap[UserIdentifier, QuotaStatus] = JavaUtils.newConcurrentHashMap() + val tenantQuotaStatus: JMap[String, QuotaStatus] = JavaUtils.newConcurrentHashMap() + @volatile + var clusterQuotaStatus: QuotaStatus = new QuotaStatus() val appQuotaStatus: JMap[String, QuotaStatus] = JavaUtils.newConcurrentHashMap() val userResourceConsumptionMap: JMap[UserIdentifier, ResourceConsumption] = JavaUtils.newConcurrentHashMap() @@ -78,7 +80,13 @@ class QuotaManager( def getUserStorageQuota(user: UserIdentifier): StorageQuota = { Option(configService) - .map(_.getTenantUserConfigFromCache(user.tenantId, user.name).getTenantStorageQuota) + .map(_.getTenantUserConfigFromCache(user.tenantId, user.name).getUserStorageQuota) + .getOrElse(StorageQuota.DEFAULT_QUOTA) + } + + def getTenantStorageQuota(tenantId: String): StorageQuota = { + Option(configService) + .map(_.getTenantConfigFromCache(tenantId).getTenantStorageQuota) .getOrElse(StorageQuota.DEFAULT_QUOTA) } @@ -101,11 +109,17 @@ class QuotaManager( checkQuotaSpace(s"$USER_EXHAUSTED user: $user. ", consumption, quota) } - private def checkClusterQuotaSpace( - consumption: ResourceConsumption, - quota: StorageQuota): QuotaStatus = { - checkQuotaSpace(CLUSTER_EXHAUSTED, consumption, quota) + private def checkTenantQuotaSpace( + tenantId: String, + consumption: ResourceConsumption): QuotaStatus = { + val quota = getTenantStorageQuota(tenantId) + checkQuotaSpace(s"$USER_EXHAUSTED tenant: $tenantId. ", consumption, quota) + } + + private def checkClusterQuotaSpace(consumption: ResourceConsumption): QuotaStatus = { + checkQuotaSpace(CLUSTER_EXHAUSTED, consumption, getClusterStorageQuota) } + private def checkQuotaSpace( reason: String, consumption: ResourceConsumption, @@ -168,44 +182,77 @@ class QuotaManager( masterSource.sample(UPDATE_RESOURCE_CONSUMPTION_TIME, this.getClass.getSimpleName, Map.empty) { val clusterQuota = getClusterStorageQuota var clusterResourceConsumption = ResourceConsumption(0, 0, 0, 0) - val userResourceConsumption = - statusSystem.availableWorkers.asScala.toList.flatMap { workerInfo => - workerInfo.userResourceConsumption.asScala - }.groupBy(_._1).map { case (userIdentifier, userConsumptionList) => - // Step 1: Compute user consumption and set quota status. - val resourceConsumptionList = userConsumptionList.map(_._2) - val resourceConsumption = computeUserResourceConsumption(resourceConsumptionList) - userQuotaStatus.put( - userIdentifier, - checkUserQuotaSpace(userIdentifier, resourceConsumption)) + val tenantResourceConsumption = + workerToResourceConsumptions.asScala.flatMap(_._2.asScala).groupBy(_._1.tenantId).map { + case (tenantId, tenantConsumptionList) => + var tenantResourceConsumption = ResourceConsumption(0, 0, 0, 0) + val userResourceConsumption = + tenantConsumptionList.groupBy(_._1).map { + case (userIdentifier, userConsumptionList) => + // Step 1: Compute user consumption and set quota status. + val resourceConsumptionList = userConsumptionList.map(_._2).toSeq + val resourceConsumption = computeUserResourceConsumption(resourceConsumptionList) + + // Step 2: Update user resource consumption metrics. + // For extract metrics + userResourceConsumptionMap.put(userIdentifier, resourceConsumption) + registerUserResourceConsumptionMetrics(userIdentifier) + + // Step 3: Expire user level exceeded app except already expired app + clusterResourceConsumption = clusterResourceConsumption.add(resourceConsumption) + tenantResourceConsumption = tenantResourceConsumption.add(resourceConsumption) + val quotaStatus = checkUserQuotaSpace(userIdentifier, resourceConsumption) + userQuotaStatus.put(userIdentifier, quotaStatus) + if (interruptShuffleEnabled && quotaStatus.exceed) { + val subResourceConsumptions = computeSubConsumption(resourceConsumptionList) + // Compute expired size + val (expired, notExpired) = subResourceConsumptions.partition { case (app, _) => + appQuotaStatus.containsKey(app) + } + val userConsumptions = + expired.values.foldLeft(resourceConsumption)(_.subtract(_)) + expireApplication( + userConsumptions, + getUserStorageQuota(userIdentifier), + notExpired.toSeq, + USER_EXHAUSTED) + (Option(subResourceConsumptions), resourceConsumptionList) + } else { + (None, resourceConsumptionList) + } + } - // Step 2: Update user resource consumption metrics. - // For extract metrics - userResourceConsumptionMap.put(userIdentifier, resourceConsumption) - registerUserResourceConsumptionMetrics(userIdentifier) + val quotaStatus = checkTenantQuotaSpace(tenantId, tenantResourceConsumption) + tenantQuotaStatus.put(tenantId, quotaStatus) + // Step 4: Expire tenant level exceeded app except already expired app + if (interruptShuffleEnabled && quotaStatus.exceed) { + val appConsumptions = userResourceConsumption.map { + case (None, subConsumptionList) => computeSubConsumption(subConsumptionList) + case (Some(subConsumptions), _) => subConsumptions + }.flatMap(_.toSeq).toSeq - // Step 3: Expire user level exceeded app except already expired app - clusterResourceConsumption = clusterResourceConsumption.add(resourceConsumption) - val userQuota = getUserStorageQuota(userIdentifier) - if (interruptShuffleEnabled && checkConsumptionExceeded(resourceConsumption, userQuota)) { - val subResourceConsumptions = computeSubAppConsumption(resourceConsumptionList) - // Compute expired size - val (expired, notExpired) = subResourceConsumptions.partition { case (app, _) => - appQuotaStatus.containsKey(app) + // Compute nonExpired app total usage + val (expired, notExpired) = appConsumptions.partition { case (app, _) => + appQuotaStatus.containsKey(app) + } + tenantResourceConsumption = + expired.map(_._2).foldLeft(tenantResourceConsumption)(_.subtract(_)) + expireApplication( + tenantResourceConsumption, + getTenantStorageQuota(tenantId), + notExpired, + TENANT_EXHAUSTED) + (Option(appConsumptions), tenantConsumptionList.map(_._2)) + } else { + (None, tenantConsumptionList.map(_._2)) } - val userConsumptions = expired.values.foldLeft(resourceConsumption)(_.subtract(_)) - expireApplication(userConsumptions, userQuota, notExpired.toSeq, USER_EXHAUSTED) - (Option(subResourceConsumptions), resourceConsumptionList) - } else { - (None, resourceConsumptionList) - } } - // Step 4: Expire cluster level exceeded app except already expired app - if (interruptShuffleEnabled && - checkClusterQuotaSpace(clusterResourceConsumption, clusterQuota).exceed) { - val appConsumptions = userResourceConsumption.map { - case (None, subConsumptionList) => computeSubAppConsumption(subConsumptionList) + // Step 5: Expire cluster level exceeded app except already expired app + clusterQuotaStatus = checkClusterQuotaSpace(clusterResourceConsumption) + if (interruptShuffleEnabled && clusterQuotaStatus.exceed) { + val appConsumptions = tenantResourceConsumption.map { + case (None, subConsumptionList) => computeSubConsumption(subConsumptionList.toSeq) case (Some(subConsumptions), _) => subConsumptions }.flatMap(_.toSeq).toSeq @@ -248,7 +295,7 @@ class QuotaManager( consumptions.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _) } - private def computeSubAppConsumption( + private def computeSubConsumption( resourceConsumptionList: Seq[ResourceConsumption]): Map[String, ResourceConsumption] = { resourceConsumptionList.foldRight(Map.empty[String, ResourceConsumption]) { case (consumption, subConsumption) => diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala index 561ae7fe793..2d7d2383548 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala @@ -25,6 +25,8 @@ object QuotaStatus { val NORMAL: String = "" val CLUSTER_EXHAUSTED: String = "Interrupt application caused by the cluster storage usage reach threshold." + val TENANT_EXHAUSTED: String = + "Interrupt application caused by the tenant storage usage reach threshold." val USER_EXHAUSTED: String = "Interrupt or reject application caused by the user storage usage reach threshold." } diff --git a/master/src/test/resources/dynamicConfig-quota-2.yaml b/master/src/test/resources/dynamicConfig-quota-2.yaml index fd5e27aaa82..a30b7430ea6 100644 --- a/master/src/test/resources/dynamicConfig-quota-2.yaml +++ b/master/src/test/resources/dynamicConfig-quota-2.yaml @@ -16,23 +16,20 @@ # - level: SYSTEM config: - celeborn.quota.tenant.diskBytesWritten: 1000G - celeborn.quota.tenant.diskFileCount: 100 - celeborn.quota.tenant.hdfsBytesWritten: 1G + celeborn.quota.user.diskBytesWritten: 1000G + celeborn.quota.user.diskFileCount: 100 + celeborn.quota.user.hdfsBytesWritten: 1G celeborn.quota.cluster.diskBytesWritten: 130G celeborn.quota.interruptShuffle.enabled: true - tenantId: tenant_01 level: TENANT config: - celeborn.quota.tenant.diskBytesWritten: 10G - celeborn.quota.tenant.diskFileCount: 1000 - celeborn.quota.tenant.hdfsBytesWritten: 10G + celeborn.quota.user.diskBytesWritten: 10G + celeborn.quota.user.diskFileCount: 1000 + celeborn.quota.user.hdfsBytesWritten: 10G users: - name: Jerry config: - celeborn.quota.tenant.diskBytesWritten: 100G - celeborn.quota.tenant.diskFileCount: 10000 - celeborn.master.userResourceConsumption.user.threshold: 120G - - + celeborn.quota.user.diskBytesWritten: 100G + celeborn.quota.user.diskFileCount: 10000 diff --git a/master/src/test/resources/dynamicConfig-quota-3.yaml b/master/src/test/resources/dynamicConfig-quota-3.yaml new file mode 100644 index 00000000000..30711c07a6a --- /dev/null +++ b/master/src/test/resources/dynamicConfig-quota-3.yaml @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.quota.cluster.diskBytesWritten: 300G + celeborn.quota.interruptShuffle.enabled: true + +- tenantId: tenant_01 + level: TENANT + config: + celeborn.quota.tenant.diskBytesWritten: 150G + celeborn.quota.tenant.diskFileCount: 1500 + users: + - name: Jerry + config: + celeborn.quota.user.diskBytesWritten: 100G + celeborn.quota.user.diskFileCount: 10000 + - name: John + config: + celeborn.quota.user.diskBytesWritten: 100G + celeborn.quota.user.diskFileCount: 10000 + + diff --git a/master/src/test/resources/dynamicConfig-quota.yaml b/master/src/test/resources/dynamicConfig-quota.yaml index 8a7d33d03f9..a829b7c9331 100644 --- a/master/src/test/resources/dynamicConfig-quota.yaml +++ b/master/src/test/resources/dynamicConfig-quota.yaml @@ -16,21 +16,21 @@ # - level: SYSTEM config: - celeborn.quota.tenant.diskBytesWritten: 1G - celeborn.quota.tenant.diskFileCount: 100 - celeborn.quota.tenant.hdfsBytesWritten: 1G + celeborn.quota.user.diskBytesWritten: 1G + celeborn.quota.user.diskFileCount: 100 + celeborn.quota.user.hdfsBytesWritten: 1G celeborn.quota.interruptShuffle.enabled: true - tenantId: tenant_01 level: TENANT config: - celeborn.quota.tenant.diskBytesWritten: 10G - celeborn.quota.tenant.diskFileCount: 1000 - celeborn.quota.tenant.hdfsBytesWritten: 10G + celeborn.quota.user.diskBytesWritten: 10G + celeborn.quota.user.diskFileCount: 1000 + celeborn.quota.user.hdfsBytesWritten: 10G users: - name: Jerry config: - celeborn.quota.tenant.diskBytesWritten: 100G - celeborn.quota.tenant.diskFileCount: 10000 + celeborn.quota.user.diskBytesWritten: 100G + celeborn.quota.user.diskFileCount: 10000 diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala index 6f2649ddf0a..4b629ff32f7 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.celeborn.service.deploy.master.quota +import java.util + import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} import scala.util.Random @@ -29,14 +31,11 @@ import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.WorkerInfo import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource -import org.apache.celeborn.common.protocol.TransportModuleConstants import org.apache.celeborn.common.protocol.message.ControlMessages.CheckQuotaResponse import org.apache.celeborn.common.quota.{ResourceConsumption, StorageQuota} -import org.apache.celeborn.common.rpc.RpcEnv -import org.apache.celeborn.common.util.Utils +import org.apache.celeborn.common.util.{JavaUtils, Utils} import org.apache.celeborn.server.common.service.config.{ConfigService, DynamicConfigServiceFactory, FsConfigServiceImpl} import org.apache.celeborn.service.deploy.master.MasterSource -import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager class QuotaManagerSuite extends CelebornFunSuite with BeforeAndAfterAll @@ -53,6 +52,9 @@ class QuotaManagerSuite extends CelebornFunSuite 10003, 10004) + val workerToResourceConsumptions = + JavaUtils.newConcurrentHashMap[String, util.Map[UserIdentifier, ResourceConsumption]]() + val conf = new CelebornConf() var configService: ConfigService = _ @@ -62,21 +64,12 @@ class QuotaManagerSuite extends CelebornFunSuite conf.set( CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key, getTestResourceFile("dynamicConfig-quota.yaml").getPath) - val statusSystem = new SingleMasterMetaManager( - RpcEnv.create( - "test-rpc", - TransportModuleConstants.RPC_SERVICE_MODULE, - "localhost", - 9001, - conf, - None), - conf) - statusSystem.availableWorkers.add(worker) resourceConsumptionSource = new ResourceConsumptionSource(conf, "Master") DynamicConfigServiceFactory.reset() configService = DynamicConfigServiceFactory.getConfigService(conf) + quotaManager = new QuotaManager( - statusSystem, + workerToResourceConsumptions, new MasterSource(conf), resourceConsumptionSource, conf, @@ -423,18 +416,8 @@ class QuotaManagerSuite extends CelebornFunSuite conf1.set( CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key, getTestResourceFile("dynamicConfig-quota-2.yaml").getPath) - val statusSystem = new SingleMasterMetaManager( - RpcEnv.create( - "test-rpc", - TransportModuleConstants.RPC_SERVICE_MODULE, - "localhost", - 9001, - conf1, - None), - conf1) - statusSystem.availableWorkers.add(worker) val quotaManager1 = new QuotaManager( - statusSystem, + workerToResourceConsumptions, new MasterSource(conf1), resourceConsumptionSource, conf1, @@ -526,6 +509,123 @@ class QuotaManagerSuite extends CelebornFunSuite clearUserConsumption() } + test("test tenant level conf") { + val conf1 = new CelebornConf() + conf1.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS") + conf1.set( + CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key, + getTestResourceFile("dynamicConfig-quota-3.yaml").getPath) + val quotaManager1 = new QuotaManager( + workerToResourceConsumptions, + new MasterSource(conf1), + resourceConsumptionSource, + conf1, + new FsConfigServiceImpl(conf1)) + + val user1 = UserIdentifier("tenant_01", "Jerry") + val user2 = UserIdentifier("tenant_01", "John") + + val rc1 = + ResourceConsumption( + Utils.byteStringAsBytes("230G"), + 0, + 0, + 0) + rc1.withSubResourceConsumptions( + Map( + "app1" -> ResourceConsumption( + Utils.byteStringAsBytes("150G"), + 0, + 0, + 0), + "app2" -> ResourceConsumption( + Utils.byteStringAsBytes("80G"), + 0, + 0, + 0)).asJava) + + val rc2 = + ResourceConsumption( + Utils.byteStringAsBytes("220G"), + 0, + 0, + 0) + + rc2.withSubResourceConsumptions( + Map( + "app3" -> ResourceConsumption( + Utils.byteStringAsBytes("150G"), + 0, + 0, + 0), + "app4" -> ResourceConsumption( + Utils.byteStringAsBytes("70G"), + 0, + 0, + 0)).asJava) + + addUserConsumption(user1, rc1) + addUserConsumption(user2, rc2) + + quotaManager1.updateResourceConsumption() + val res1 = quotaManager1.checkUserQuotaStatus(user1) + val res2 = quotaManager1.checkUserQuotaStatus(user2) + val res3 = quotaManager1.checkApplicationQuotaStatus("app1") + val res4 = quotaManager1.checkApplicationQuotaStatus("app2") + val res5 = quotaManager1.checkApplicationQuotaStatus("app3") + val res6 = quotaManager1.checkApplicationQuotaStatus("app4") + assert(res1 == CheckQuotaResponse( + false, + "" + + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "user: `tenant_01`.`Jerry`. DISK_BYTES_WRITTEN(230.0 GiB) exceeds quota(100.0 GiB). ")) + assert(res2 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "user: `tenant_01`.`John`. DISK_BYTES_WRITTEN(220.0 GiB) exceeds quota(100.0 GiB). ")) + assert(res3 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: ResourceConsumption(" + + "diskBytesWritten: 150.0 GiB, " + + "diskFileCount: 0, " + + "hdfsBytesWritten: 0.0 B, " + + "hdfsFileCount: 0), " + + "Threshold: Quota[" + + "diskBytesWritten=100.0 GiB, " + + "diskFileCount=10000, " + + "hdfsBytesWritten=8.0 EiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res4 == CheckQuotaResponse( + false, + "Interrupt application caused by the tenant storage usage reach threshold. " + + "Used: ResourceConsumption(" + + "diskBytesWritten: 80.0 GiB, " + + "diskFileCount: 0, " + + "hdfsBytesWritten: 0.0 B, " + + "hdfsFileCount: 0), " + + "Threshold: Quota[" + + "diskBytesWritten=150.0 GiB, " + + "diskFileCount=1500, " + + "hdfsBytesWritten=8.0 EiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res5 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: ResourceConsumption(" + + "diskBytesWritten: 150.0 GiB, " + + "diskFileCount: 0, " + + "hdfsBytesWritten: 0.0 B, " + + "hdfsFileCount: 0), " + + "Threshold: Quota[" + + "diskBytesWritten=100.0 GiB, " + + "diskFileCount=10000, " + + "hdfsBytesWritten=8.0 EiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res6 == CheckQuotaResponse(true, "")) + clearUserConsumption() + } + def checkUserQuota(userIdentifier: UserIdentifier): CheckQuotaResponse = { quotaManager.checkUserQuotaStatus(userIdentifier) } @@ -540,6 +640,7 @@ class QuotaManagerSuite extends CelebornFunSuite userIdentifier: UserIdentifier, resourceConsumption: ResourceConsumption): Unit = { worker.userResourceConsumption.put(userIdentifier, resourceConsumption) + workerToResourceConsumptions.put(worker.toUniqueId(), worker.userResourceConsumption) } def clearUserConsumption(): Unit = { diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java index fd2ce4b8eb8..daa4b67d460 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java @@ -96,23 +96,23 @@ public T formatValue( public StorageQuota getTenantStorageQuota() { return new StorageQuota( getValue( - CelebornConf.QUOTA_DISK_BYTES_WRITTEN().key(), - CelebornConf.QUOTA_DISK_BYTES_WRITTEN(), + CelebornConf.QUOTA_TENANT_DISK_BYTES_WRITTEN().key(), + CelebornConf.QUOTA_TENANT_DISK_BYTES_WRITTEN(), Long.TYPE, ConfigType.BYTES), getValue( - CelebornConf.QUOTA_DISK_FILE_COUNT().key(), - CelebornConf.QUOTA_DISK_FILE_COUNT(), + CelebornConf.QUOTA_TENANT_DISK_FILE_COUNT().key(), + CelebornConf.QUOTA_TENANT_DISK_FILE_COUNT(), Long.TYPE, ConfigType.STRING), getValue( - CelebornConf.QUOTA_HDFS_BYTES_WRITTEN().key(), - CelebornConf.QUOTA_HDFS_BYTES_WRITTEN(), + CelebornConf.QUOTA_TENANT_HDFS_BYTES_WRITTEN().key(), + CelebornConf.QUOTA_TENANT_HDFS_BYTES_WRITTEN(), Long.TYPE, ConfigType.BYTES), getValue( - CelebornConf.QUOTA_HDFS_FILE_COUNT().key(), - CelebornConf.QUOTA_HDFS_FILE_COUNT(), + CelebornConf.QUOTA_TENANT_HDFS_FILE_COUNT().key(), + CelebornConf.QUOTA_TENANT_HDFS_FILE_COUNT(), Long.TYPE, ConfigType.STRING)); } @@ -193,6 +193,30 @@ public StorageQuota getClusterStorageQuota() { ConfigType.STRING)); } + public StorageQuota getUserStorageQuota() { + return new StorageQuota( + getValue( + CelebornConf.QUOTA_USER_DISK_BYTES_WRITTEN().key(), + CelebornConf.QUOTA_USER_DISK_BYTES_WRITTEN(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.QUOTA_USER_DISK_FILE_COUNT().key(), + CelebornConf.QUOTA_USER_DISK_FILE_COUNT(), + Long.TYPE, + ConfigType.STRING), + getValue( + CelebornConf.QUOTA_USER_HDFS_BYTES_WRITTEN().key(), + CelebornConf.QUOTA_USER_HDFS_BYTES_WRITTEN(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.QUOTA_USER_HDFS_FILE_COUNT().key(), + CelebornConf.QUOTA_USER_HDFS_FILE_COUNT(), + Long.TYPE, + ConfigType.STRING)); + } + public boolean interruptShuffleEnabled() { return getValue( CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED().key(),