diff --git a/server/src/main/java/com/github/dtprj/dongting/fiber/Dispatcher.java b/server/src/main/java/com/github/dtprj/dongting/fiber/Dispatcher.java index 9ddd9974..57e49772 100644 --- a/server/src/main/java/com/github/dtprj/dongting/fiber/Dispatcher.java +++ b/server/src/main/java/com/github/dtprj/dongting/fiber/Dispatcher.java @@ -33,6 +33,7 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.util.ArrayList; +import java.util.LinkedList; import java.util.PriorityQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -224,7 +225,8 @@ private void processScheduleFibers() { continue; } if (f.source != null) { - f.source.removeWaiter(f); + // assert waiters must not null, because source is not null + f.source.waiters.remove(f); f.source.prepare(f, true); } f.cleanSchedule(); @@ -424,13 +426,20 @@ static FrameCallResult awaitOn(WaitSource c, long millis, FrameCall resumePoint) static FrameCallResult awaitOn(Fiber fiber, WaitSource c, long millis, FrameCall resumePoint) { checkInterrupt(fiber); checkReentry(fiber); + return awaitOn0(fiber, c, millis, resumePoint); + } + + private static FrameCallResult awaitOn0(Fiber fiber, WaitSource c, long millis, FrameCall resumePoint) { FiberFrame currentFrame = fiber.stackTop; currentFrame.resumePoint = resumePoint; fiber.source = c; fiber.scheduleTimeoutMillis = millis; fiber.ready = false; fiber.fiberGroup.dispatcher.addToScheduleQueue(millis, fiber); - c.addWaiter(fiber); + if (c.waiters == null) { + c.waiters = new LinkedList<>(); + } + c.waiters.addLast(fiber); return FrameCallResult.SUSPEND; } @@ -455,15 +464,9 @@ static void sleepUntilShouldStop(long millis, FrameCall resumePoint) { FiberFrame currentFrame = fiber.stackTop; currentFrame.resumePoint = resumePoint; fiber.ready = false; - return; + } else { + awaitOn0(fiber, g.shouldStopCondition, millis, resumePoint); } - FiberFrame currentFrame = fiber.stackTop; - currentFrame.resumePoint = resumePoint; - fiber.scheduleTimeoutMillis = millis; - fiber.source = g.shouldStopCondition; - g.shouldStopCondition.addWaiter(fiber); - fiber.ready = false; - fiber.fiberGroup.dispatcher.addToScheduleQueue(millis, fiber); } static void yield(FrameCall resumePoint) { @@ -534,7 +537,8 @@ void interrupt(Fiber fiber) { String str; if (fiber.source != null) { WaitSource s = fiber.source; - s.removeWaiter(fiber); + // assert waiters is not null + s.waiters.remove(fiber); fiber.source = null; str = s.toString(); } else { diff --git a/server/src/main/java/com/github/dtprj/dongting/fiber/Fiber.java b/server/src/main/java/com/github/dtprj/dongting/fiber/Fiber.java index 7e14db65..bb65a602 100644 --- a/server/src/main/java/com/github/dtprj/dongting/fiber/Fiber.java +++ b/server/src/main/java/com/github/dtprj/dongting/fiber/Fiber.java @@ -29,9 +29,6 @@ public class Fiber extends WaitSource { long scheduleTimeoutMillis; long scheduleNanoTime; - Fiber previousWaiter; - Fiber nextWaiter; - boolean started; boolean ready; boolean finished; diff --git a/server/src/main/java/com/github/dtprj/dongting/fiber/FiberCondition.java b/server/src/main/java/com/github/dtprj/dongting/fiber/FiberCondition.java index 7fd04e01..35daf458 100644 --- a/server/src/main/java/com/github/dtprj/dongting/fiber/FiberCondition.java +++ b/server/src/main/java/com/github/dtprj/dongting/fiber/FiberCondition.java @@ -54,7 +54,8 @@ public void signal(Fiber targetFiber) { if (targetFiber.source != this) { return; } - removeWaiter(targetFiber); + // assert waiters not null + waiters.remove(targetFiber); signalFiber(targetFiber, true); } diff --git a/server/src/main/java/com/github/dtprj/dongting/fiber/FiberLock.java b/server/src/main/java/com/github/dtprj/dongting/fiber/FiberLock.java index 018dfa17..8218df10 100644 --- a/server/src/main/java/com/github/dtprj/dongting/fiber/FiberLock.java +++ b/server/src/main/java/com/github/dtprj/dongting/fiber/FiberLock.java @@ -103,9 +103,7 @@ public void unlock() { heldCount--; if (heldCount <= 0) { owner = null; - if (firstWaiter != null) { - signal0(true); - } else if (readLock.firstWaiter != null) { + if (!signal0(true)) { readLock.signalAll0(true); } } diff --git a/server/src/main/java/com/github/dtprj/dongting/fiber/FiberReadLock.java b/server/src/main/java/com/github/dtprj/dongting/fiber/FiberReadLock.java index 8120195e..57d51796 100644 --- a/server/src/main/java/com/github/dtprj/dongting/fiber/FiberReadLock.java +++ b/server/src/main/java/com/github/dtprj/dongting/fiber/FiberReadLock.java @@ -85,9 +85,7 @@ public void unlock() { // check fiber held this read lock? heldCount--; if (heldCount <= 0) { - if (writeLock.firstWaiter != null) { - writeLock.signal0(true); - } + writeLock.signal0(true); } } } diff --git a/server/src/main/java/com/github/dtprj/dongting/fiber/WaitSource.java b/server/src/main/java/com/github/dtprj/dongting/fiber/WaitSource.java index f9bacacb..84d8479e 100644 --- a/server/src/main/java/com/github/dtprj/dongting/fiber/WaitSource.java +++ b/server/src/main/java/com/github/dtprj/dongting/fiber/WaitSource.java @@ -15,13 +15,14 @@ */ package com.github.dtprj.dongting.fiber; +import java.util.LinkedList; + /** * @author huangli */ abstract class WaitSource { protected final String name; - private Fiber lastWaiter; - Fiber firstWaiter; + LinkedList waiters; protected final FiberGroup fiberGroup; public WaitSource(String name, FiberGroup group) { @@ -31,74 +32,16 @@ public WaitSource(String name, FiberGroup group) { protected abstract void prepare(Fiber waitFiber, boolean timeout); - - void addWaiter(Fiber f) { - if (firstWaiter == null) { - firstWaiter = f; - } else { - lastWaiter.nextWaiter = f; - f.previousWaiter = lastWaiter; - } - lastWaiter = f; - } - - void removeWaiter(Fiber f) { - if (f == firstWaiter) { - if (f == lastWaiter) { - firstWaiter = null; - lastWaiter = null; - } else { - firstWaiter = f.nextWaiter; - firstWaiter.previousWaiter = null; - } - } else if (f == lastWaiter) { - lastWaiter = f.previousWaiter; - lastWaiter.nextWaiter = null; - } else { - f.previousWaiter.nextWaiter = f.nextWaiter; - f.nextWaiter.previousWaiter = f.previousWaiter; - } - f.nextWaiter = null; - f.previousWaiter = null; - } - - Fiber popTailWaiter() { - Fiber result = lastWaiter; - if (result != null) { - if (result == firstWaiter) { - firstWaiter = null; - lastWaiter = null; - } else { - lastWaiter = result.previousWaiter; - lastWaiter.nextWaiter = null; - result.previousWaiter = null; - } - } - return result; - } - - Fiber popHeadWaiter() { - Fiber result = firstWaiter; - if (result != null) { - if (result == lastWaiter) { - firstWaiter = null; - lastWaiter = null; - } else { - firstWaiter = result.nextWaiter; - firstWaiter.previousWaiter = null; - result.nextWaiter = null; - } - } - return result; - } - - void signal0(boolean addFirst) { + boolean signal0(boolean addFirst) { if (fiberGroup.finished) { - return; + return false; } - Fiber f = popHeadWaiter(); - if (f != null) { + Fiber f; + if (waiters != null && (f = waiters.pollFirst()) != null) { signalFiber(f, addFirst); + return true; + } else { + return false; } } @@ -115,13 +58,17 @@ void signalAll0(boolean addFirst) { if (fiberGroup.finished) { return; } + LinkedList waiters = this.waiters; + if (waiters == null) { + return; + } Fiber f; if (addFirst) { - while ((f = popTailWaiter()) != null) { + while ((f = waiters.pollLast()) != null) { signalFiber(f, true); } } else { - while ((f = popHeadWaiter()) != null) { + while ((f = waiters.pollFirst()) != null) { signalFiber(f, false); } } diff --git a/server/src/test/java/com/github/dtprj/dongting/fiber/WaitSourceTest.java b/server/src/test/java/com/github/dtprj/dongting/fiber/WaitSourceTest.java deleted file mode 100644 index 5ece6eb7..00000000 --- a/server/src/test/java/com/github/dtprj/dongting/fiber/WaitSourceTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright The Dongting Project - * - * The Dongting Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package com.github.dtprj.dongting.fiber; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * @author huangli - */ -public class WaitSourceTest { - private FiberFuture future; - private Fiber f1; - private Fiber f2; - private Fiber f3; - private Fiber f4; - - @BeforeEach - public void setUp() { - Dispatcher dispatcher = new Dispatcher("name"); - FiberGroup group = new FiberGroup("g", dispatcher); - future = group.newFuture("noName"); - f1 = new Fiber("f1", group, new EmptyFiberFrame()); - f2 = new Fiber("f2", group, new EmptyFiberFrame()); - f3 = new Fiber("f3", group, new EmptyFiberFrame()); - f4 = new Fiber("f4", group, new EmptyFiberFrame()); - } - - @Test - public void testPopHead() { - future.addWaiter(f1); - future.addWaiter(f2); - future.addWaiter(f3); - Assertions.assertSame(f1, future.popHeadWaiter()); - Assertions.assertSame(f2, future.popHeadWaiter()); - Assertions.assertSame(f3, future.popHeadWaiter()); - } - - @Test - public void testPopTail(){ - future.addWaiter(f1); - future.addWaiter(f2); - future.addWaiter(f3); - Assertions.assertSame(f3, future.popTailWaiter()); - Assertions.assertSame(f2, future.popTailWaiter()); - Assertions.assertSame(f1, future.popTailWaiter()); - } - - @Test - public void testRemove() { - future.addWaiter(f1); - future.addWaiter(f2); - future.addWaiter(f3); - future.addWaiter(f4); - future.removeWaiter(f2);//mid - future.removeWaiter(f1);//head - future.removeWaiter(f4);//tail - future.removeWaiter(f3);//last one - Assertions.assertNull(future.popHeadWaiter()); - Assertions.assertNull(future.popTailWaiter()); - } -}