Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1792] MemoryManager resume should use pinnedDirectMemory instead of usedDirectMemory #3018

Closed
wants to merge 18 commits into from

Conversation

leixm
Copy link
Contributor

@leixm leixm commented Dec 20, 2024

What changes were proposed in this pull request?

Congestion and MemoryManager should use pinnedDirectMemory instead of usedDirectMemory

Why are the changes needed?

In our production environment, after worker pausing, the usedDirectMemory keep high and does not decrease. The worker node is permanently blacklisted and cannot be used.

This problem has been bothering us for a long time. When the thred cache is turned off, in fact, after ctx.channel().config().setAutoRead(false), the netty framework will still hold some ByteBufs. This part of ByteBuf result in a lot of PoolChunks cannot be released.

In netty, if a chunk is 16M and 8k of this chunk has been allocated, then the pinnedMemory is 8k and the activeMemory is 16M. The remaining (16M-8k) memory can be allocated, but not yet allocated, netty allocates and releases memory in chunk units, so the 8k that has been allocated will result in 16M that cannot be returned to the operating system.

Here are some scenes from our production/test environment:

We config 10gb off-heap memory for worker, other configs as below:

celeborn.network.memory.allocator.allowCache                         false
celeborn.worker.monitor.memory.check.interval                         100ms
celeborn.worker.monitor.memory.report.interval                        10s
celeborn.worker.directMemoryRatioToPauseReceive                       0.75
celeborn.worker.directMemoryRatioToPauseReplicate                     0.85
celeborn.worker.directMemoryRatioToResume                             0.5

When receiving high traffic, the worker's usedDirectMemory increases. After triggering trim and pause, usedDirectMemory still does not reach the resume threshold, and worker was excluded.

image

So we checked the heap snapshot of the abnormal worker, we can see that there are a large number of DirectByteBuffers in the heap memory. These DirectByteBuffers are all 4mb in size, which is exactly the size of chunksize. According to the path to gc root, DirectByteBuffer is held by PoolChunk, and these 4m only have 160k pinnedBytes.

image

image

There are many ByteBufs that are not released

image

The stack shows that these ByteBufs are allocated by netty
image

We tried to reproduce this situation in the test environment. When the same problem occurred, we added a restful api of the worker to force the worker to resume. After the resume, the worker returned to normal, and PushDataHandler handled many delayed requests.

image

image

So I think that when pinnedMemory is not high enough, we should not trigger pause and congestion, because at this time a large part of the memory can still be allocated.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

@leixm
Copy link
Contributor Author

leixm commented Dec 20, 2024

Before optimization

image

After optimization

image

@FMX
Copy link
Contributor

FMX commented Dec 21, 2024

I have a question. Will the worker OOM when the pinned memory is high? In our previous implementations, the direct memory counter will count all direct memory allocated, whether or not the allocator can allocate.
Can you do a pressure test for this scenario?

pan3793 added a commit that referenced this pull request Jan 2, 2025
### What changes were proposed in this pull request?

This PR introduces a configuration `celeborn.network.memory.allocator.pooled` to allow users to disable `PooledByteBufAllocator` globally and always use `UnpooledByteBufAllocator`.

### Why are the changes needed?

In some extreme cases, the Netty's `PooledByteBufAllocator` might have tons of 4MiB chunks but only a few sizes of the capacity are used by the real data(see #3018), for scenarios that stability is important than performance, it's desirable to allow users to disable the `PooledByteBufAllocator` globally.

### Does this PR introduce _any_ user-facing change?

Add a new feature, disabled by default.

### How was this patch tested?

Pass UT to ensure correctness. Performance and memory impact need to be verified in the production scale cluster.

Closes #3043 from pan3793/CELEBORN-1815.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
@RexXiong
Copy link
Contributor

RexXiong commented Jan 3, 2025

I discussed this with FMX offline, and we believe it might be better to use pinnedDirectMemory for worker resumption while keeping usedDirectMemory for trimming. @leixm @AngersZhuuuu WDYT?

@leixm
Copy link
Contributor Author

leixm commented Jan 3, 2025

I discussed this with FMX offline, and we believe it might be better to use pinnedDirectMemory for worker resumption while keeping usedDirectMemory for trimming. @leixm @AngersZhuuuu WDYT?

It's okay for me. @AngersZhuuuu WDYT?

@FMX
Copy link
Contributor

FMX commented Jan 3, 2025

Don't forget to update the default value of this config "celeborn.worker.directMemoryRatioToResume" to 0.3.

@leixm leixm changed the title [CELEBORN-1792] Congestion and MemoryManager should use pinnedDirectMemory instead of usedDirectMemory [CELEBORN-1792] MemoryManager resume should use pinnedDirectMemory instead of usedDirectMemory Jan 12, 2025
allocatedMemory = memoryUsage;
}
// trigger resume
// CELEBORN-1792: resume should use pinnedDirectMemory instead of usedDirectMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we needn't change to pause state, it would be better to call trim when netty direct memory used above pausePushDataThreshold/pauseReplicateThreshold, WDYT?

@@ -3858,7 +3858,7 @@ object CelebornConf extends Logging {
.doc("If direct memory usage is less than this limit, worker will resume.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.7)
.createWithDefault(0.3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a new conf for pinnedMemoryToResume and keep exist conf for directMemoryRatioToResume

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, Can we do some test for this?

if (pinnedMemoryCheckEnabled
&& System.currentTimeMillis() >= pinnedMemoryNextCheckTime
&& getAllocatedMemory() / (double) (maxDirectMemory) < pinnedMemoryResumeRatio) {
pinnedMemoryNextCheckTime += pinnedMemoryCheckInterval;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pinnedMemoryNextCheckTime compute seems incorrect, We can Use System.currentTimeMillis() as last checkTime, then we can use System.currentTimeMillis()-lastCheckTime >= pinnedMemoryCheckInterval to check whether need resume.

@@ -93,6 +96,9 @@ public class MemoryManager {
private long memoryFileStorageThreshold;
private final LongAdder memoryFileStorageCounter = new LongAdder();
private final StorageManager storageManager;
private boolean pinnedMemoryCheckEnabled;
private long pinnedMemoryCheckInterval;
private long pinnedMemoryLastCheckTime = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value for this is 0.

@@ -282,7 +292,7 @@ private MemoryManager(CelebornConf conf, StorageManager storageManager, Abstract
Utils.bytesToString(readBufferThreshold),
Utils.bytesToString(readBufferTarget),
Utils.bytesToString(memoryFileStorageThreshold),
resumeRatio);
directMemoryResumeRatio);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add pinned memory resume ratio here. It is an important parameter for memory manager.

@@ -436,6 +445,16 @@ public long getMemoryUsage() {
return getNettyUsedDirectMemory() + sortMemoryCounter.get();
}

public long getAllocatedMemory() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should be renamed to getPinnedMemory. The allocated memory is the netty memory counter.

@@ -93,6 +96,9 @@ public class MemoryManager {
private long memoryFileStorageThreshold;
private final LongAdder memoryFileStorageCounter = new LongAdder();
private final StorageManager storageManager;
private boolean pinnedMemoryCheckEnabled;
private long pinnedMemoryCheckInterval;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid frequently calling a pinned memory counter, I think you can cache the last pinned memory value and refresh it periodically. And exporting the pinned memory value to the metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is another PR introducing pinnedMemory metrics #3019

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getPinnedMemory is not called very frequently. It is called once every pinnedMemoryCheckInterval. The default is 10 seconds.

logger.debug("Trigger action: TRIM");
trimCounter += 1;
// force to append pause spent time even we are in pause state
trimAllListeners();
if (trimCounter >= forceAppendPauseSpentTimeThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lost trimCounter+=1

memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
logger.debug("Trigger action: TRIM");
trimAllListeners();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @leixm

@RexXiong RexXiong closed this in 9131c1e Jan 22, 2025
@RexXiong
Copy link
Contributor

Thanks, merge to main(v0.6.0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants