Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1577][Phase2] QuotaManager should support interrupt shuffle. #2819

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 84 additions & 12 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5411,40 +5411,40 @@ object CelebornConf extends Logging {
.stringConf
.createWithDefault("org.apache.celeborn.server.common.container.DefaultContainerInfoProvider")

val QUOTA_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
val QUOTA_TENANT_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.diskBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written disk bytes.")
.doc("Tenant level quota dynamic configuration for written disk bytes.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] =
val QUOTA_TENANT_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.diskFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written disk file count.")
.doc("Tenant level quota dynamic configuration for written disk file count.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
val QUOTA_TENANT_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written hdfs bytes.")
.doc("Tenant level quota dynamic configuration for written hdfs bytes.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] =
val QUOTA_TENANT_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.hdfsFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written hdfs file count.")
.doc("Tenant level quota dynamic configuration for written hdfs file count.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_INTERRUPT_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
Expand Down Expand Up @@ -6051,4 +6051,76 @@ object CelebornConf extends Logging {
.doubleConf
.checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
.createWithDefault(1)

val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.diskBytesWritten")
.categories("quota")
.dynamic
.doc("Cluster level quota dynamic configuration for written disk bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.diskFileCount")
.categories("quota")
.dynamic
.doc("Cluster level quota dynamic configuration for written disk file count.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("Cluster level quota dynamic configuration for written hdfs bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.hdfsFileCount")
.categories("quota")
.dynamic
.doc("Cluster level quota dynamic configuration for written hdfs file count.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_USER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.user.diskBytesWritten")
.categories("quota")
.dynamic
.doc("User level quota dynamic configuration for written disk bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_USER_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.user.diskFileCount")
.categories("quota")
.dynamic
.doc("User level quota dynamic configuration for written disk file count.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_USER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.user.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("User level quota dynamic configuration for written hdfs bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_USER_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.user.hdfsFileCount")
.categories("quota")
.dynamic
.doc("User level quota dynamic configuration for written hdfs file count.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ case class ResourceConsumption(
hdfsFileCount: Long,
var subResourceConsumptions: util.Map[String, ResourceConsumption] = null) {

def withSubResourceConsumptions(
resourceConsumptions: util.Map[String, ResourceConsumption]): ResourceConsumption = {
subResourceConsumptions = resourceConsumptions
this
}

def add(other: ResourceConsumption): ResourceConsumption = {
ResourceConsumption(
diskBytesWritten + other.diskBytesWritten,
Expand All @@ -38,6 +44,14 @@ case class ResourceConsumption(
hdfsFileCount + other.hdfsFileCount)
}

def subtract(other: ResourceConsumption): ResourceConsumption = {
ResourceConsumption(
diskBytesWritten - other.diskBytesWritten,
diskFileCount - other.diskFileCount,
hdfsBytesWritten - other.hdfsBytesWritten,
hdfsFileCount - other.hdfsFileCount)
}

def addSubResourceConsumptions(otherSubResourceConsumptions: Map[
String,
ResourceConsumption]): Map[String, ResourceConsumption] = {
Expand Down Expand Up @@ -77,4 +91,11 @@ case class ResourceConsumption(
s" hdfsFileCount: $hdfsFileCount," +
s" subResourceConsumptions: $subResourceConsumptionString)"
}

def simpleString: String = {
s"ResourceConsumption(diskBytesWritten: ${Utils.bytesToString(diskBytesWritten)}," +
s" diskFileCount: $diskFileCount," +
s" hdfsBytesWritten: ${Utils.bytesToString(hdfsBytesWritten)}," +
s" hdfsFileCount: $hdfsFileCount)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.celeborn.common.quota
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.Utils

case class Quota(
case class StorageQuota(
diskBytesWritten: Long,
diskFileCount: Long,
hdfsBytesWritten: Long,
Expand All @@ -34,3 +34,7 @@ case class Quota(
s"]"
}
}

object StorageQuota {
val DEFAULT_QUOTA = StorageQuota(Long.MaxValue, Long.MaxValue, Long.MaxValue, Long.MaxValue)
}
16 changes: 12 additions & 4 deletions docs/configuration/quota.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@ license: |
<!--begin-include-->
| Key | Default | isDynamic | Description | Since | Deprecated |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.quota.cluster.diskBytesWritten | 9223372036854775807b | true | Cluster level quota dynamic configuration for written disk bytes. | 0.6.0 | |
| celeborn.quota.cluster.diskFileCount | 9223372036854775807b | true | Cluster level quota dynamic configuration for written disk file count. | 0.6.0 | |
| celeborn.quota.cluster.hdfsBytesWritten | 9223372036854775807b | true | Cluster level quota dynamic configuration for written hdfs bytes. | 0.6.0 | |
| celeborn.quota.cluster.hdfsFileCount | 9223372036854775807b | true | Cluster level quota dynamic configuration for written hdfs file count. | 0.6.0 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. | 0.5.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807b | true | Tenant level quota dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807b | true | Tenant level quota dynamic configuration for written disk file count. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807b | true | Tenant level quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807b | true | Tenant level quota dynamic configuration for written hdfs file count. | 0.5.0 | |
| celeborn.quota.user.diskBytesWritten | 9223372036854775807b | true | User level quota dynamic configuration for written disk bytes. | 0.6.0 | |
| celeborn.quota.user.diskFileCount | 9223372036854775807b | true | User level quota dynamic configuration for written disk file count. | 0.6.0 | |
| celeborn.quota.user.hdfsBytesWritten | 9223372036854775807b | true | User level quota dynamic configuration for written hdfs bytes. | 0.6.0 | |
| celeborn.quota.user.hdfsFileCount | 9223372036854775807b | true | User level quota dynamic configuration for written hdfs file count. | 0.6.0 | |
<!--end-include-->
8 changes: 8 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ license: |

# Upgrading from 0.5 to 0.6

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskBytesWritten` to `celeborn.quota.user.diskBytesWritten`. Please use `celeborn.quota.user.diskBytesWritten` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskFileCount` to `celeborn.quota.user.diskFileCount`. Please use `celeborn.quota.user.diskFileCount` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsBytesWritten` to `celeborn.quota.user.hdfsBytesWritten`. Please use `celeborn.quota.user.hdfsBytesWritten` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsFileCount` to `celeborn.quota.user.hdfsFileCount`. Please use `celeborn.quota.user.hdfsFileCount` if you want to set user level quota.

- Since 0.6.0, Celeborn changed the default value of `celeborn.master.slot.assign.extraSlots` from `2` to `100`, which means Celeborn will involve more workers in offering slots.

- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.low.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,18 @@ public void updateWorkerHeartbeatMeta(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
WorkerStatus workerStatus,
boolean highWorkload) {
WorkerInfo worker =
new WorkerInfo(
host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, userResourceConsumption);
new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, null);
AtomicLong availableSlots = new AtomicLong();
LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks);
synchronized (workersMap) {
Optional<WorkerInfo> workerInfo = Optional.ofNullable(workersMap.get(worker.toUniqueId()));
workerInfo.ifPresent(
info -> {
info.updateThenGetDiskInfos(disks, Option.apply(estimatedPartitionSize));
info.updateThenGetUserResourceConsumption(userResourceConsumption);
availableSlots.set(info.totalAvailableSlots());
info.lastHeartbeat_$eq(time);
info.setWorkerStatus(workerStatus);
Expand Down Expand Up @@ -613,4 +610,19 @@ private void addShuffleFallbackCounts(Map<String, Long> fallbackCounts) {
fallbackPolicy, (k, v) -> v == null ? fallbackCounts.get(k) : v + fallbackCounts.get(k));
}
}

public void updateWorkerResourceConsumptions(
String host,
int rpcPort,
int pushPort,
int fetchPort,
int replicatePort,
Map<UserIdentifier, ResourceConsumption> resourceConsumptions) {
WorkerInfo worker =
new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, null, null);
synchronized (workersMap) {
Optional<WorkerInfo> workerInfo = Optional.ofNullable(workersMap.get(worker.toUniqueId()));
workerInfo.ifPresent(info -> info.updateThenGetUserResourceConsumption(resourceConsumptions));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ void handleWorkerHeartbeat(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
boolean highWorkload,
WorkerStatus workerStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,12 @@ public void handleWorkerHeartbeat(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
boolean highWorkload,
WorkerStatus workerStatus,
String requestId) {
updateWorkerHeartbeatMeta(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
disks,
userResourceConsumption,
time,
workerStatus,
highWorkload);
host, rpcPort, pushPort, fetchPort, replicatePort, disks, time, workerStatus, highWorkload);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ public void handleWorkerHeartbeat(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
boolean highWorkload,
WorkerStatus workerStatus,
Expand All @@ -290,8 +289,6 @@ public void handleWorkerHeartbeat(
.setFetchPort(fetchPort)
.setReplicatePort(replicatePort)
.putAllDisks(MetaUtil.toPbDiskInfos(disks))
.putAllUserResourceConsumption(
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
.setWorkerStatus(MetaUtil.toPbWorkerStatus(workerStatus))
.setTime(time)
.setHighWorkload(highWorkload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques
fetchPort,
replicatePort,
diskInfos,
userResourceConsumption,
request.getWorkerHeartbeatRequest().getTime(),
workerStatus,
highWorkload);
Expand Down
Loading
Loading