Skip to content

Commit

Permalink
[CELEBORN-1577][Phase2] QuotaManager should support interrupt shuffle.
Browse files Browse the repository at this point in the history
  • Loading branch information
leixm committed Dec 6, 2024
1 parent 8948df1 commit 1b94be6
Show file tree
Hide file tree
Showing 19 changed files with 1,205 additions and 244 deletions.
96 changes: 84 additions & 12 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5393,40 +5393,40 @@ object CelebornConf extends Logging {
.stringConf
.createWithDefault("org.apache.celeborn.server.common.container.DefaultContainerInfoProvider")

val QUOTA_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
val QUOTA_TENANT_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.diskBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written disk bytes.")
.doc("Tenant level quota dynamic configuration for written disk bytes.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] =
val QUOTA_TENANT_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.diskFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written disk file count.")
.doc("Tenant level quota dynamic configuration for written disk file count.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
val QUOTA_TENANT_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written hdfs bytes.")
.doc("Tenant level quota dynamic configuration for written hdfs bytes.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] =
val QUOTA_TENANT_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.hdfsFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written hdfs file count.")
.doc("Tenant level quota dynamic configuration for written hdfs file count.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_INTERRUPT_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
Expand Down Expand Up @@ -6033,4 +6033,76 @@ object CelebornConf extends Logging {
.doubleConf
.checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
.createWithDefault(1)

val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.diskBytesWritten")
.categories("quota")
.dynamic
.doc("Cluster level quota dynamic configuration for written disk bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.diskFileCount")
.categories("quota")
.dynamic
.doc("Cluster level quota dynamic configuration for written disk file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("Cluster level quota dynamic configuration for written hdfs bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.hdfsFileCount")
.categories("quota")
.dynamic
.doc("Cluster level quota dynamic configuration for written hdfs file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)

val QUOTA_USER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.user.diskBytesWritten")
.categories("quota")
.dynamic
.doc("User level quota dynamic configuration for written disk bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_USER_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.user.diskFileCount")
.categories("quota")
.dynamic
.doc("User level quota dynamic configuration for written disk file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)

val QUOTA_USER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.user.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("User level quota dynamic configuration for written hdfs bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_USER_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.user.hdfsFileCount")
.categories("quota")
.dynamic
.doc("User level quota dynamic configuration for written hdfs file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ case class ResourceConsumption(
hdfsFileCount: Long,
var subResourceConsumptions: util.Map[String, ResourceConsumption] = null) {

def withSubResourceConsumptions(
resourceConsumptions: util.Map[String, ResourceConsumption]): ResourceConsumption = {
subResourceConsumptions = resourceConsumptions
this
}

def add(other: ResourceConsumption): ResourceConsumption = {
ResourceConsumption(
diskBytesWritten + other.diskBytesWritten,
Expand All @@ -38,6 +44,14 @@ case class ResourceConsumption(
hdfsFileCount + other.hdfsFileCount)
}

def subtract(other: ResourceConsumption): ResourceConsumption = {
ResourceConsumption(
diskBytesWritten - other.diskBytesWritten,
diskFileCount - other.diskFileCount,
hdfsBytesWritten - other.hdfsBytesWritten,
hdfsFileCount - other.hdfsFileCount)
}

def addSubResourceConsumptions(otherSubResourceConsumptions: Map[
String,
ResourceConsumption]): Map[String, ResourceConsumption] = {
Expand Down Expand Up @@ -77,4 +91,11 @@ case class ResourceConsumption(
s" hdfsFileCount: $hdfsFileCount," +
s" subResourceConsumptions: $subResourceConsumptionString)"
}

def simpleString: String = {
s"ResourceConsumption(diskBytesWritten: ${Utils.bytesToString(diskBytesWritten)}," +
s" diskFileCount: $diskFileCount," +
s" hdfsBytesWritten: ${Utils.bytesToString(hdfsBytesWritten)}," +
s" hdfsFileCount: $hdfsFileCount)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.celeborn.common.quota
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.Utils

case class Quota(
case class StorageQuota(
diskBytesWritten: Long,
diskFileCount: Long,
hdfsBytesWritten: Long,
Expand All @@ -34,3 +34,7 @@ case class Quota(
s"]"
}
}

object StorageQuota {
val DEFAULT_QUOTA = StorageQuota(Long.MaxValue, Long.MaxValue, Long.MaxValue, Long.MaxValue)
}
16 changes: 12 additions & 4 deletions docs/configuration/quota.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@ license: |
<!--begin-include-->
| Key | Default | isDynamic | Description | Since | Deprecated |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.quota.cluster.diskBytesWritten | 9223372036854775807b | true | Cluster level quota dynamic configuration for written disk bytes. | 0.6.0 | |
| celeborn.quota.cluster.diskFileCount | 9223372036854775807 | true | Cluster level quota dynamic configuration for written disk file count. | 0.6.0 | |
| celeborn.quota.cluster.hdfsBytesWritten | 9223372036854775807b | true | Cluster level quota dynamic configuration for written hdfs bytes. | 0.6.0 | |
| celeborn.quota.cluster.hdfsFileCount | 9223372036854775807 | true | Cluster level quota dynamic configuration for written hdfs file count. | 0.6.0 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. | 0.5.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807b | true | Tenant level quota dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807b | true | Tenant level quota dynamic configuration for written disk file count. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807b | true | Tenant level quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807b | true | Tenant level quota dynamic configuration for written hdfs file count. | 0.5.0 | |
| celeborn.quota.user.diskBytesWritten | 9223372036854775807b | true | User level quota dynamic configuration for written disk bytes. | 0.6.0 | |
| celeborn.quota.user.diskFileCount | 9223372036854775807 | true | User level quota dynamic configuration for written disk file count. | 0.6.0 | |
| celeborn.quota.user.hdfsBytesWritten | 9223372036854775807b | true | User level quota dynamic configuration for written hdfs bytes. | 0.6.0 | |
| celeborn.quota.user.hdfsFileCount | 9223372036854775807 | true | User level quota dynamic configuration for written hdfs file count. | 0.6.0 | |
<!--end-include-->
8 changes: 8 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ license: |

# Upgrading from 0.5 to 0.6

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskBytesWritten` to `celeborn.quota.user.diskBytesWritten`. Please use `celeborn.quota.user.diskBytesWritten` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskFileCount` to `celeborn.quota.user.diskFileCount`. Please use `celeborn.quota.user.diskFileCount` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsBytesWritten` to `celeborn.quota.user.hdfsBytesWritten`. Please use `celeborn.quota.user.hdfsBytesWritten` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsFileCount` to `celeborn.quota.user.hdfsFileCount`. Please use `celeborn.quota.user.hdfsFileCount` if you want to set user level quota.

- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.low.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.

- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.high.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.high.watermark` instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,18 @@ public void updateWorkerHeartbeatMeta(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
WorkerStatus workerStatus,
boolean highWorkload) {
WorkerInfo worker =
new WorkerInfo(
host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, userResourceConsumption);
new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, null);
AtomicLong availableSlots = new AtomicLong();
LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks);
synchronized (workersMap) {
Optional<WorkerInfo> workerInfo = Optional.ofNullable(workersMap.get(worker.toUniqueId()));
workerInfo.ifPresent(
info -> {
info.updateThenGetDiskInfos(disks, Option.apply(estimatedPartitionSize));
info.updateThenGetUserResourceConsumption(userResourceConsumption);
availableSlots.set(info.totalAvailableSlots());
info.lastHeartbeat_$eq(time);
info.setWorkerStatus(workerStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ void handleWorkerHeartbeat(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
boolean highWorkload,
WorkerStatus workerStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,12 @@ public void handleWorkerHeartbeat(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
boolean highWorkload,
WorkerStatus workerStatus,
String requestId) {
updateWorkerHeartbeatMeta(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
disks,
userResourceConsumption,
time,
workerStatus,
highWorkload);
host, rpcPort, pushPort, fetchPort, replicatePort, disks, time, workerStatus, highWorkload);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ public void handleWorkerHeartbeat(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
boolean highWorkload,
WorkerStatus workerStatus,
Expand All @@ -290,8 +289,6 @@ public void handleWorkerHeartbeat(
.setFetchPort(fetchPort)
.setReplicatePort(replicatePort)
.putAllDisks(MetaUtil.toPbDiskInfos(disks))
.putAllUserResourceConsumption(
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
.setWorkerStatus(MetaUtil.toPbWorkerStatus(workerStatus))
.setTime(time)
.setHighWorkload(highWorkload)
Expand Down
Loading

0 comments on commit 1b94be6

Please sign in to comment.