Skip to content

Commit

Permalink
feat: add lease read for DtKV.get(), add comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jun 12, 2024
1 parent 3100576 commit c2de02a
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.dtprj.dongting.codec.Decoder;
import com.github.dtprj.dongting.codec.Encodable;
import com.github.dtprj.dongting.codec.StrEncoder;
import com.github.dtprj.dongting.common.DtTime;
import com.github.dtprj.dongting.fiber.FiberFuture;
import com.github.dtprj.dongting.fiber.FiberGroup;
import com.github.dtprj.dongting.raft.RaftException;
Expand Down Expand Up @@ -73,7 +74,7 @@ public Object exec(long index, int term, RaftInput input) {
StrEncoder key = (StrEncoder) input.getHeader();
ByteArrayEncoder data = (ByteArrayEncoder) input.getBody();
return switch (input.getBizType()) {
case BIZ_TYPE_GET -> kvStatus.kvImpl.get(key.getStr());
case BIZ_TYPE_GET -> kvStatus.kvImpl.get(index, key.getStr());
case BIZ_TYPE_PUT -> {
kvStatus.kvImpl.put(index, key.getStr(), data.getData(), maxOpenSnapshotIndex);
yield null;
Expand All @@ -84,10 +85,11 @@ public Object exec(long index, int term, RaftInput input) {
}

/**
* read in other threads.
* raft lease read, can read in any threads.
* @see com.github.dtprj.dongting.raft.server.RaftGroup#getLeaseReadIndex(DtTime)
*/
public byte[] get(String key) {
return kvStatus.kvImpl.get(key);
public byte[] get(long index, String key) {
return kvStatus.kvImpl.get(index, key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ protected WriteFrame doProcess(ReqInfo<GetReq> reqInfo) {
ReadFrame<GetReq> frame = reqInfo.getReqFrame();
ReqContext reqContext = reqInfo.getReqContext();
RaftGroup group = reqInfo.getRaftGroup();
group.getLogIndexForRead(reqContext.getTimeout()).whenComplete((logIndex, ex) -> {
group.getLeaseReadIndex(reqContext.getTimeout()).whenComplete((logIndex, ex) -> {
if (ex != null) {
processError(reqInfo, ex);
} else {
DtKV dtKV = (DtKV) group.getStateMachine();
byte[] bytes = dtKV.get(frame.getBody().getKey());
byte[] bytes = dtKV.get(logIndex, frame.getBody().getKey());
ByteBufferWriteFrame wf = new ByteBufferWriteFrame(bytes == null ? null : ByteBuffer.wrap(bytes));
wf.setRespCode(CmdCodes.SUCCESS);
writeResp(reqInfo, wf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ class KvImpl {
private final ConcurrentSkipListMap<String, Value> map = new ConcurrentSkipListMap<>();
private final LinkedList<Value> needCleanList = new LinkedList<>();

public byte[] get(String key) {
public byte[] get(long index, String key) {
if (key == null) {
throw new IllegalArgumentException("key is null");
}
Value value = map.get(key);
while (value != null) {
if (value.getRaftIndex() > index) {
value = value.getPrevious();
} else {
break;
}
}
if (value == null) {
return null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private void registerCallback(CompletableFuture<?> f, long size) {
}

@Override
public CompletableFuture<Long> getLogIndexForRead(DtTime deadline) {
public CompletableFuture<Long> getLeaseReadIndex(DtTime deadline) {
if (fiberGroup.isShouldStop()) {
return CompletableFuture.failedFuture(new RaftException("raft group thread is stop"));
}
Expand All @@ -140,20 +140,21 @@ public CompletableFuture<Long> getLogIndexForRead(DtTime deadline) {
if (ss.leaseEndNanos - t < 0) {
return CompletableFuture.failedFuture(new NotLeaderException(null));
}
if (ss.groupReadyFuture == null) {

CompletableFuture<Void> groupReadyFuture = ss.groupReadyFuture;
if (groupReadyFuture == null) {
return CompletableFuture.completedFuture(ss.lastApplied);
}

if (ss.groupReadyFuture.isDone()) {
if (groupReadyFuture.isDone()) {
return CompletableFuture.completedFuture(ss.lastApplied);
}
// wait fist commit of applied
return ss.groupReadyFuture.thenCompose(v -> {
return groupReadyFuture.thenCompose(v -> {
if (deadline.isTimeout()) {
return CompletableFuture.failedFuture(new RaftExecTimeoutException());
}
// ss should re-read
return getLogIndexForRead(deadline);
return getLeaseReadIndex(deadline);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,18 @@ public abstract class RaftGroup {

public abstract CompletableFuture<RaftOutput> submitLinearTask(RaftInput input);

public abstract CompletableFuture<Long> getLogIndexForRead(DtTime deadline);
/**
* Get raft lease read index, use this index to read data from the state machine.
* Generally, the future returned by this method should complete immediately,
* however, it may be blocked in some conditions.
*
* <p>NOTE: Lease read is also linearizable.
*
* <li>If current node is not leader, or lease timeout(indicates something wrong),
* the future will complete with a NotLeaderException. </li>
* <li>If can't get the index before deadline, the future will complete with a RaftExecTimeoutException. </li>
*/
public abstract CompletableFuture<Long> getLeaseReadIndex(DtTime deadline);


/**
Expand Down

0 comments on commit c2de02a

Please sign in to comment.