Skip to content

Commit

Permalink
refactor: use LinkedList in WaitSource.
Browse files Browse the repository at this point in the history
This commit is prepare for multi-condition wait.
  • Loading branch information
areyouok committed Dec 25, 2024
1 parent e97df5a commit 9d3e843
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -224,7 +225,8 @@ private void processScheduleFibers() {
continue;
}
if (f.source != null) {
f.source.removeWaiter(f);
// assert waiters must not null, because source is not null
f.source.waiters.remove(f);
f.source.prepare(f, true);
}
f.cleanSchedule();
Expand Down Expand Up @@ -424,13 +426,20 @@ static FrameCallResult awaitOn(WaitSource c, long millis, FrameCall resumePoint)
static FrameCallResult awaitOn(Fiber fiber, WaitSource c, long millis, FrameCall resumePoint) {
checkInterrupt(fiber);
checkReentry(fiber);
return awaitOn0(fiber, c, millis, resumePoint);
}

private static FrameCallResult awaitOn0(Fiber fiber, WaitSource c, long millis, FrameCall resumePoint) {
FiberFrame currentFrame = fiber.stackTop;
currentFrame.resumePoint = resumePoint;
fiber.source = c;
fiber.scheduleTimeoutMillis = millis;
fiber.ready = false;
fiber.fiberGroup.dispatcher.addToScheduleQueue(millis, fiber);
c.addWaiter(fiber);
if (c.waiters == null) {
c.waiters = new LinkedList<>();
}
c.waiters.addLast(fiber);
return FrameCallResult.SUSPEND;
}

Expand All @@ -455,15 +464,9 @@ static void sleepUntilShouldStop(long millis, FrameCall<Void> resumePoint) {
FiberFrame currentFrame = fiber.stackTop;
currentFrame.resumePoint = resumePoint;
fiber.ready = false;
return;
} else {
awaitOn0(fiber, g.shouldStopCondition, millis, resumePoint);
}
FiberFrame currentFrame = fiber.stackTop;
currentFrame.resumePoint = resumePoint;
fiber.scheduleTimeoutMillis = millis;
fiber.source = g.shouldStopCondition;
g.shouldStopCondition.addWaiter(fiber);
fiber.ready = false;
fiber.fiberGroup.dispatcher.addToScheduleQueue(millis, fiber);
}

static void yield(FrameCall<Void> resumePoint) {
Expand Down Expand Up @@ -534,7 +537,8 @@ void interrupt(Fiber fiber) {
String str;
if (fiber.source != null) {
WaitSource s = fiber.source;
s.removeWaiter(fiber);
// assert waiters is not null
s.waiters.remove(fiber);
fiber.source = null;
str = s.toString();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ public class Fiber extends WaitSource {
long scheduleTimeoutMillis;
long scheduleNanoTime;

Fiber previousWaiter;
Fiber nextWaiter;

boolean started;
boolean ready;
boolean finished;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void signal(Fiber targetFiber) {
if (targetFiber.source != this) {
return;
}
removeWaiter(targetFiber);
// assert waiters not null
waiters.remove(targetFiber);
signalFiber(targetFiber, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ public void unlock() {
heldCount--;
if (heldCount <= 0) {
owner = null;
if (firstWaiter != null) {
signal0(true);
} else if (readLock.firstWaiter != null) {
if (!signal0(true)) {
readLock.signalAll0(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ public void unlock() {
// check fiber held this read lock?
heldCount--;
if (heldCount <= 0) {
if (writeLock.firstWaiter != null) {
writeLock.signal0(true);
}
writeLock.signal0(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
*/
package com.github.dtprj.dongting.fiber;

import java.util.LinkedList;

/**
* @author huangli
*/
abstract class WaitSource {
protected final String name;
private Fiber lastWaiter;
Fiber firstWaiter;
LinkedList<Fiber> waiters;
protected final FiberGroup fiberGroup;

public WaitSource(String name, FiberGroup group) {
Expand All @@ -31,74 +32,16 @@ public WaitSource(String name, FiberGroup group) {

protected abstract void prepare(Fiber waitFiber, boolean timeout);


void addWaiter(Fiber f) {
if (firstWaiter == null) {
firstWaiter = f;
} else {
lastWaiter.nextWaiter = f;
f.previousWaiter = lastWaiter;
}
lastWaiter = f;
}

void removeWaiter(Fiber f) {
if (f == firstWaiter) {
if (f == lastWaiter) {
firstWaiter = null;
lastWaiter = null;
} else {
firstWaiter = f.nextWaiter;
firstWaiter.previousWaiter = null;
}
} else if (f == lastWaiter) {
lastWaiter = f.previousWaiter;
lastWaiter.nextWaiter = null;
} else {
f.previousWaiter.nextWaiter = f.nextWaiter;
f.nextWaiter.previousWaiter = f.previousWaiter;
}
f.nextWaiter = null;
f.previousWaiter = null;
}

Fiber popTailWaiter() {
Fiber result = lastWaiter;
if (result != null) {
if (result == firstWaiter) {
firstWaiter = null;
lastWaiter = null;
} else {
lastWaiter = result.previousWaiter;
lastWaiter.nextWaiter = null;
result.previousWaiter = null;
}
}
return result;
}

Fiber popHeadWaiter() {
Fiber result = firstWaiter;
if (result != null) {
if (result == lastWaiter) {
firstWaiter = null;
lastWaiter = null;
} else {
firstWaiter = result.nextWaiter;
firstWaiter.previousWaiter = null;
result.nextWaiter = null;
}
}
return result;
}

void signal0(boolean addFirst) {
boolean signal0(boolean addFirst) {
if (fiberGroup.finished) {
return;
return false;
}
Fiber f = popHeadWaiter();
if (f != null) {
Fiber f;
if (waiters != null && (f = waiters.pollFirst()) != null) {
signalFiber(f, addFirst);
return true;
} else {
return false;
}
}

Expand All @@ -115,13 +58,17 @@ void signalAll0(boolean addFirst) {
if (fiberGroup.finished) {
return;
}
LinkedList<Fiber> waiters = this.waiters;
if (waiters == null) {
return;
}
Fiber f;
if (addFirst) {
while ((f = popTailWaiter()) != null) {
while ((f = waiters.pollLast()) != null) {
signalFiber(f, true);
}
} else {
while ((f = popHeadWaiter()) != null) {
while ((f = waiters.pollFirst()) != null) {
signalFiber(f, false);
}
}
Expand Down

This file was deleted.

0 comments on commit 9d3e843

Please sign in to comment.