Skip to content

Commit

Permalink
[CELEBORN-1792] MemoryManager resume should use pinnedDirectMemory in…
Browse files Browse the repository at this point in the history
…stead of usedDirectMemory
  • Loading branch information
leixm committed Jan 12, 2025
1 parent eb950c8 commit 5fdc844
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.celeborn.common.network.util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -47,6 +49,8 @@ public class NettyUtils {
private static final ByteBufAllocator[] _sharedByteBufAllocator = new ByteBufAllocator[2];
private static final ConcurrentHashMap<String, Integer> allocatorsIndex =
JavaUtils.newConcurrentHashMap();
private static final List<PooledByteBufAllocator> pooledByteBufAllocators = new ArrayList<>();

/** Creates a new ThreadFactory which prefixes each thread with the given name. */
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
return new DefaultThreadFactory(threadPoolPrefix, true);
Expand Down Expand Up @@ -141,6 +145,9 @@ public static synchronized ByteBufAllocator getSharedByteBufAllocator(
_sharedByteBufAllocator[index] =
createByteBufAllocator(
conf.networkMemoryAllocatorPooled(), true, allowCache, conf.networkAllocatorArenas());
if (conf.networkMemoryAllocatorPooled()) {
pooledByteBufAllocators.add((PooledByteBufAllocator) _sharedByteBufAllocator[index]);
}
if (source != null) {
new NettyMemoryMetrics(
_sharedByteBufAllocator[index],
Expand Down Expand Up @@ -178,6 +185,9 @@ public static ByteBufAllocator getByteBufAllocator(
conf.preferDirectBufs(),
allowCache,
arenas);
if (conf.getCelebornConf().networkMemoryAllocatorPooled()) {
pooledByteBufAllocators.add((PooledByteBufAllocator) allocator);
}
if (source != null) {
String poolName = "default-netty-pool";
Map<String, String> labels = new HashMap<>();
Expand All @@ -196,4 +206,8 @@ public static ByteBufAllocator getByteBufAllocator(
}
return allocator;
}

public static List<PooledByteBufAllocator> getAllPooledByteBufAllocators() {
return pooledByteBufAllocators;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3858,7 +3858,7 @@ object CelebornConf extends Logging {
.doc("If direct memory usage is less than this limit, worker will resume.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.7)
.createWithDefault(0.3)

val WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.memoryFileStorage.maxFileSize")
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ license: |
| celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | false | Max ratio of direct memory for read buffer | 0.2.0 | |
| celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | false | If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients. | 0.2.0 | |
| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | false | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | |
| celeborn.worker.directMemoryRatioToResume | 0.7 | false | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | |
| celeborn.worker.directMemoryRatioToResume | 0.3 | false | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | |
| celeborn.worker.disk.clean.threads | 4 | false | Thread number of worker to clean up directories of expired shuffle keys on disk. | 0.3.2 | |
| celeborn.worker.fetch.heartbeat.enabled | false | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | |
| celeborn.worker.fetch.io.threads | &lt;undefined&gt; | false | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.common.util.Utils;
Expand Down Expand Up @@ -93,6 +95,7 @@ public class MemoryManager {
private long memoryFileStorageThreshold;
private final LongAdder memoryFileStorageCounter = new LongAdder();
private final StorageManager storageManager;
private boolean networkMemoryAllocatorPooled;

@VisibleForTesting
public static MemoryManager initialize(CelebornConf conf) {
Expand Down Expand Up @@ -159,6 +162,7 @@ private MemoryManager(CelebornConf conf, StorageManager storageManager, Abstract
readBufferThreshold = (long) (maxDirectMemory * readBufferRatio);
readBufferTarget = (long) (readBufferThreshold * readBufferTargetRatio);
memoryFileStorageThreshold = (long) (maxDirectMemory * memoryFileStorageRatio);
networkMemoryAllocatorPooled = conf.networkMemoryAllocatorPooled();

checkService.scheduleWithFixedDelay(
() -> {
Expand Down Expand Up @@ -293,6 +297,18 @@ public boolean shouldEvict(boolean aggressiveMemoryFileEvictEnabled, double evic

public ServingState currentServingState() {
long memoryUsage = getMemoryUsage();
long allocatedMemory;
if (networkMemoryAllocatorPooled) {
allocatedMemory = getAllocatedMemory();
} else {
allocatedMemory = memoryUsage;
}
// trigger resume
// CELEBORN-1792: resume should use pinnedDirectMemory instead of usedDirectMemory
if (allocatedMemory / (double) (maxDirectMemory) < resumeRatio) {
isPaused = false;
return ServingState.NONE_PAUSED;
}
// pause replicate threshold always greater than pause push data threshold
// so when trigger pause replicate, pause both push and replicate
if (memoryUsage > pauseReplicateThreshold) {
Expand All @@ -304,11 +320,6 @@ public ServingState currentServingState() {
isPaused = true;
return ServingState.PUSH_PAUSED;
}
// trigger resume
if (memoryUsage / (double) (maxDirectMemory) < resumeRatio) {
isPaused = false;
return ServingState.NONE_PAUSED;
}
// if isPaused and not trigger resume, then return pause push
// wait for trigger resumeThreshold to resume state
return isPaused ? ServingState.PUSH_PAUSED : ServingState.NONE_PAUSED;
Expand Down Expand Up @@ -436,6 +447,16 @@ public long getMemoryUsage() {
return getNettyUsedDirectMemory() + sortMemoryCounter.get();
}

public long getAllocatedMemory() {
return getNettyPinnedDirectMemory() + sortMemoryCounter.get();
}

public long getNettyPinnedDirectMemory() {
return NettyUtils.getAllPooledByteBufAllocators().stream()
.mapToLong(PooledByteBufAllocator::pinnedDirectMemory)
.sum();
}

public AtomicLong getSortMemoryCounter() {
return sortMemoryCounter;
}
Expand Down

0 comments on commit 5fdc844

Please sign in to comment.