Skip to content

Commit

Permalink
fix: DtKV.remove() may leak entries in map
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jun 10, 2024
1 parent deb902d commit 41105f3
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.Map;

/**
* @author huangli
Expand Down Expand Up @@ -103,14 +103,14 @@ public FiberFuture<Void> installSnapshot(long lastIncludeIndex, int lastIncludeT
}
KvImpl kvImpl = kvStatus.kvImpl;
if (data != null && data.hasRemaining()) {
ConcurrentSkipListMap<String, Value> map = kvImpl.getMap();
Map<String, Value> map = kvImpl.getMap();
while (data.hasRemaining()) {
if (encodeStatus.readFromBuffer(data)) {
long raftIndex = encodeStatus.raftIndex;
// TODO keyBytes is temporary object, we should use a pool
String key = new String(encodeStatus.keyBytes, StandardCharsets.UTF_8);
byte[] value = encodeStatus.valueBytes;
map.put(key, new Value(raftIndex, value));
map.put(key, new Value(raftIndex, key, value));
encodeStatus.reset();
} else {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.github.dtprj.dongting.dtkv.server;

import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;

/**
Expand Down Expand Up @@ -44,11 +45,12 @@ public void put(long index, String key, byte[] data, long minOpenSnapshotIndex)
if (data == null) {
throw new IllegalArgumentException("value is null");
}
Value newValue = new Value(index, data);
Value newValue = new Value(index, key, data);
Value oldValue = map.put(key, newValue);
if (minOpenSnapshotIndex != 0 && oldValue != null) {
newValue.setPrevious(oldValue);
needCleanList.add(newValue);
oldValue.setNext(newValue);
needCleanList.add(oldValue);
}
gc(minOpenSnapshotIndex);
}
Expand All @@ -57,12 +59,15 @@ private void gc(long minOpenSnapshotIndex) {
Value value;
LinkedList<Value> needCleanList = this.needCleanList;
while ((value = needCleanList.peekFirst()) != null) {
Value oldValue = value.getPrevious();
if (oldValue.getRaftIndex() >= minOpenSnapshotIndex) {
if (value.getRaftIndex() >= minOpenSnapshotIndex) {
break;
}
value.setPrevious(null);
Value next = value.getNext();
next.setPrevious(null);
needCleanList.removeFirst();
if (next.getData() == null && next.getNext() == null) {
map.remove(next.getKey());
}
}
}

Expand All @@ -79,17 +84,18 @@ public Boolean remove(long index, String key, long minOpenSnapshotIndex) {
gc(minOpenSnapshotIndex);
return false;
} else {
Value newValue = new Value(index, null);
Value newValue = new Value(index, key, null);
newValue.setPrevious(oldValue);
oldValue.setNext(newValue);
map.put(key, newValue);
needCleanList.add(newValue);
needCleanList.add(oldValue);
gc(minOpenSnapshotIndex);
return oldValue.getData() != null;
}
}
}

public ConcurrentSkipListMap<String, Value> getMap() {
public Map<String, Value> getMap() {
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
* @author huangli
*/
class Value {
private final String key;
private final byte[] data;
private final long raftIndex;

private Value previous;
private Value next;

public Value(long raftIndex, byte[] data) {
public Value(long raftIndex,String key, byte[] data) {
this.raftIndex = raftIndex;
this.data = data;
this.key = key;
}

public byte[] getData() {
Expand All @@ -44,4 +47,16 @@ public Value getPrevious() {
public void setPrevious(Value previous) {
this.previous = previous;
}

public void setNext(Value next) {
this.next = next;
}

public Value getNext() {
return next;
}

public String getKey() {
return key;
}
}

0 comments on commit 41105f3

Please sign in to comment.