Skip to content

Commit

Permalink
feat: pool clean
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jun 13, 2024
1 parent a1efaba commit 91e20f5
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package com.github.dtprj.dongting.buf;

import com.github.dtprj.dongting.common.DtUtil;
import com.github.dtprj.dongting.common.Timestamp;

import java.util.concurrent.TimeUnit;

import static com.github.dtprj.dongting.buf.SimpleByteBufferPool.calcTotalSize;

/**
Expand All @@ -40,6 +43,14 @@ public class DefaultPoolFactory implements PoolFactory {
private static final SimpleByteBufferPool GLOBAL_DIRECT_POOL = createGlobalPool(true);
private static final SimpleByteBufferPool GLOBAL_HEAP_POOL = createGlobalPool(false);

static {
Runnable r = () -> {
GLOBAL_DIRECT_POOL.clean();
GLOBAL_HEAP_POOL.clean();
};
DtUtil.SCHEDULED_SERVICE.scheduleWithFixedDelay(r, 1, 1, TimeUnit.SECONDS);
}

private static SimpleByteBufferPool createGlobalPool(boolean direct) {
// Thread safe pool should use a dedicated timestamp
SimpleByteBufferPoolConfig c = new SimpleByteBufferPoolConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ public class SimpleByteBufferPool extends ByteBufferPool {
16, 8, 4, 2, 1, 0, 0,
0};

public static final long DEFAULT_TIME_OUT_MILLIS = 10 * 1000;

public SimpleByteBufferPool(Timestamp ts, boolean direct, int threshold) {
this(new SimpleByteBufferPoolConfig(ts, direct, threshold, false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SimpleByteBufferPoolConfig {
private int[] bufSizes = SimpleByteBufferPool.DEFAULT_BUF_SIZE;
private int[] minCount = SimpleByteBufferPool.DEFAULT_MIN_COUNT;
private int[] maxCount = SimpleByteBufferPool.DEFAULT_MAX_COUNT;
private long timeoutMillis = SimpleByteBufferPool.DEFAULT_TIME_OUT_MILLIS;
private long timeoutMillis = 10 * 1000;
private long shareSize = 0;

public SimpleByteBufferPoolConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public ByteBuffer allocate(int requestSize) {
@Override
public void clean() {
smallPool.clean();
largePool.clean();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.github.dtprj.dongting.log.DtLogs;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/**
* @author huangli
Expand All @@ -34,6 +36,12 @@ public class DtUtil {
private static int CPU_COUNT = RUNTIME.availableProcessors();
private static int cpuCountInvoke;

public final static ScheduledExecutorService SCHEDULED_SERVICE = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "DtRaftSchedule");
t.setDaemon(true);
return t;
});

public static void close(AutoCloseable... resources) {
for (AutoCloseable res : resources) {
close(res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,23 @@ private FrameCallResult initRaftFibers() {
gc.getVoteManager().startFiber();
gc.getApplyManager().init(getFiberGroup());
gc.getSnapshotManager().startFiber();
Fiber f = new Fiber("cleaner", groupConfig.getFiberGroup(), cleanFiberFrame(), true);
f.start();
return Fiber.frameReturn();
}

private FiberFrame<Void> cleanFiberFrame() {
return new FiberFrame<>() {
@Override
public FrameCallResult execute(Void input) {
groupConfig.getHeapPool().getPool().clean();
return Fiber.yield(this::exec2);
}
private FrameCallResult exec2(Void unused) {
groupConfig.getDirectPool().clean();
return Fiber.sleepUntilShouldStop(1000, this);
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.github.dtprj.dongting.common.AbstractLifeCircle;
import com.github.dtprj.dongting.common.DtTime;
import com.github.dtprj.dongting.common.DtUtil;
import com.github.dtprj.dongting.common.IntObjMap;
import com.github.dtprj.dongting.fiber.FiberFuture;
import com.github.dtprj.dongting.fiber.FiberGroup;
Expand Down Expand Up @@ -85,7 +86,7 @@ private CompletableFuture<RaftNodeEx> addToNioClient(RaftNode node) {

@Override
protected void doStart() {
this.scheduledFuture = RaftUtil.SCHEDULED_SERVICE.scheduleWithFixedDelay(
this.scheduledFuture = DtUtil.SCHEDULED_SERVICE.scheduleWithFixedDelay(
this::tryNodePingAll, 0, 2, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -154,7 +155,7 @@ private CompletableFuture<Void> nodePing(RaftNodeEx nodeEx, Consumer<Throwable>
nodeEx.setPinging(true);
// we should set connecting status in schedule thread
return sendNodePing(nodeEx).whenCompleteAsync(
(v, ex) -> processResultInScheduleThread(nodeEx, ex, extraCallback), RaftUtil.SCHEDULED_SERVICE);
(v, ex) -> processResultInScheduleThread(nodeEx, ex, extraCallback), DtUtil.SCHEDULED_SERVICE);
}

private void processResultInScheduleThread(RaftNodeEx nodeEx, Throwable ex, Consumer<Throwable> extraCallback) {
Expand Down Expand Up @@ -248,7 +249,7 @@ private List<RaftNodeEx> checkNodeIdSet(Set<Integer> nodeIds) {

private <T> FiberFuture<T> runInScheduleThread(String futureName, Supplier<T> supplier) {
FiberFuture<T> f = FiberGroup.currentGroup().newFuture(futureName);
RaftUtil.SCHEDULED_SERVICE.execute(() -> {
DtUtil.SCHEDULED_SERVICE.execute(() -> {
try {
f.fireComplete(supplier.get());
} catch (Throwable e) {
Expand Down Expand Up @@ -315,13 +316,13 @@ public CompletableFuture<RaftNodeEx> addNode(RaftNode node) {
log.error("", unexpected);
f.completeExceptionally(unexpected);
}
}, RaftUtil.SCHEDULED_SERVICE);
}, DtUtil.SCHEDULED_SERVICE);
return f;
}

public CompletableFuture<Void> removeNode(int nodeId) {
CompletableFuture<Void> f = new CompletableFuture<>();
RaftUtil.SCHEDULED_SERVICE.execute(() -> {
DtUtil.SCHEDULED_SERVICE.execute(() -> {
try {
RaftNodeEx existNode = allNodesEx.get(nodeId);
if (existNode == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.zip.CRC32C;

/**
* @author huangli
*/
public final class RaftUtil {
private static final DtLog log = DtLogs.getLogger(RaftUtil.class);
public final static ScheduledExecutorService SCHEDULED_SERVICE = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "DtRaftSchedule");
t.setDaemon(true);
return t;
});

public static void updateCrc(CRC32C crc32c, ByteBuffer buf, int startPos, int len) {
if (len == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ public CompletableFuture<Void> addGroup(RaftGroupConfig groupConfig, long acquir
"group already exist: " + groupConfig.getGroupId()));
}
CompletableFuture<RaftGroupImpl> f = new CompletableFuture<>();
RaftUtil.SCHEDULED_SERVICE.execute(() -> {
DtUtil.SCHEDULED_SERVICE.execute(() -> {
try {
RaftGroupImpl g = createRaftGroup(serverConfig, nodeManager.getAllNodeIds(), groupConfig);
g.getGroupComponents().getMemberManager().init(nodeManager.getAllNodesEx());
Expand Down

0 comments on commit 91e20f5

Please sign in to comment.