Skip to content

Commit

Permalink
[uot|nexus] use two connections for one udp stream
Browse files Browse the repository at this point in the history
  • Loading branch information
wkgcass committed Aug 4, 2024
1 parent 53804e9 commit 8da83e3
Show file tree
Hide file tree
Showing 19 changed files with 584 additions and 276 deletions.
10 changes: 7 additions & 3 deletions base/src/main/java/io/vproxy/base/connection/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.nio.channels.CancelledKeyException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

public class Connection implements NetFlowRecorder {
/**
Expand Down Expand Up @@ -495,10 +496,13 @@ public void UNSAFE_replaceBuffer(RingBuffer in, RingBuffer out, boolean cleanBuf
this.outBuffer = out;
}

public void runNoQuickWrite(Runnable r) {
public void runNoQuickWrite(Consumer<Connection> r) {
noQuickWrite = true;
r.run();
noQuickWrite = false;
try {
r.accept(this);
} finally {
noQuickWrite = false;
}
}

public static Connection wrap(SocketFD fd,
Expand Down
9 changes: 6 additions & 3 deletions base/src/main/java/io/vproxy/base/util/ByteArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ static ByteArray from(ByteBuffer buf) {
if (buf.hasArray()) {
return from(buf.array()).sub(buf.position(), len);
} else {
byte[] array = Utils.allocateByteArray(len);
buf.get(array);
return from(array);
var seg = MemorySegment.ofBuffer(buf);
return new MemorySegmentByteArray(seg);
}
}

Expand Down Expand Up @@ -132,6 +131,10 @@ default ByteArrayChannel toFullChannel() {
return ByteArrayChannel.fromFull(this);
}

default ByteArrayChannel toEmptyChannel() {
return ByteArrayChannel.fromEmpty(this);
}

default int uint24(int offset) {
return uint8(offset) << 16 | uint8(offset + 1) << 8 | uint8(offset + 2);
}
Expand Down
8 changes: 8 additions & 0 deletions base/src/main/java/io/vproxy/base/util/RingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ default int storeBytesFrom(ByteArrayChannel channel) {
}
}

default int storeBytesFrom(ByteArray array) {
return storeBytesFrom(array.toFullChannel());
}

int storeBytesFrom(ReadableByteStream channel) throws IOException;

default int storeBytesFrom(ByteBuffer buf) {
Expand All @@ -50,6 +54,10 @@ default int writeTo(ByteArrayChannel channel) {
}
}

default int writeTo(ByteArray array) {
return writeTo(array.toEmptyChannel());
}

default int writeTo(WritableByteStream channel) throws IOException {
return writeTo(channel, Integer.MAX_VALUE);
}
Expand Down
10 changes: 9 additions & 1 deletion base/src/main/java/io/vproxy/base/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,11 @@ public static ByteArray buildPseudoIPv6Header(Ipv6Packet ipv6, int upperType, in
}

public static int calculateChecksum(ByteArray array, int limit) {
int sum = 0;
var intermediate = calculateChecksumIntermediate(0, array, limit);
return calculateChecksumDoFinal(intermediate);
}

public static int calculateChecksumIntermediate(int sum, ByteArray array, int limit) {
for (int i = 0; i < limit / 2; ++i) {
sum += array.uint16(i * 2);
while (sum > 0xffff) {
Expand All @@ -778,6 +782,10 @@ public static int calculateChecksum(ByteArray array, int limit) {
sum = (sum & 0xffff) + 1;
}
}
return sum;
}

public static int calculateChecksumDoFinal(int sum) {
return 0xffff - sum;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,9 @@ public Partition(int capacity) {
}

public boolean add(E e) {
return add(e, 1);
}

private boolean add(E e, int retry) {
if (retry > 10) { // max retry for 10 times
return false; // too many retries
}

StorageArray<E> read = this.read.get();
StorageArray<E> write = this.write;

// read and write may be the same when they are swapping
if (read == write) {
// is swapping, try again
return add(e, retry + 1);
}

// adding is always safe
//noinspection RedundantIfStatement
if (write.add(e)) {
return true;
Expand All @@ -123,12 +109,7 @@ private E poll(int retry) {
StorageArray<E> read = this.read.get();
StorageArray<E> write = this.write;

// read and write may be the same when they are swapping
if (read == write) {
// is swapping, try again
return poll(retry + 1);
}

// polling is always safe
E ret = read.poll();
if (ret != null) {
return ret;
Expand All @@ -138,13 +119,12 @@ private E poll(int retry) {
// check whether we can swap (whether $write is full)

int writeEnd = write.end.get();
int writeEndIndicator = write.endIndicator.get();
if (writeEnd < write.capacity) {
return null; // capacity not reached, do not swap and return nothing
// no retry here because the write array will not change until something written into it
}
// also we should check whether there are no elements being stored
if (writeEnd != writeEndIndicator) { // element is being stored into the array
if (write.storing.get() != 0) { // element is being stored into the array
return poll(retry + 1); // try again
}
// now we can know that writing operations will not happen in this partition
Expand All @@ -168,56 +148,61 @@ public int size() {
private static class StorageArray<E> {
private final int capacity;
private final AtomicReferenceArray<E> array;
private final AtomicInteger start = new AtomicInteger(0);
private final AtomicInteger endIndicator = new AtomicInteger(0);
private final AtomicInteger end = new AtomicInteger(0);
private final AtomicInteger start = new AtomicInteger(-1);
private final AtomicInteger end = new AtomicInteger(-1);
private final AtomicInteger storing = new AtomicInteger(0);

private StorageArray(int capacity) {
this.capacity = capacity;
this.array = new AtomicReferenceArray<>(capacity);
}

boolean add(E e) {
if (end.get() >= capacity || endIndicator.get() >= capacity) {
storing.incrementAndGet();

if (end.get() >= capacity) {
storing.decrementAndGet();
return false; // exceeds capacity
}
int index = endIndicator.getAndIncrement();
// it could still have concurrency between the capacity check and actual $end increment or $endIndicator increment
int index = end.incrementAndGet();
if (index < capacity) {
// storing should succeed
array.set(index, e);
// increase $end after element actually stored
end.getAndIncrement();
storing.decrementAndGet();
return true;
} else {
// storing should fail
// decrease the endIndicator
endIndicator.getAndDecrement();
// storing failed
storing.decrementAndGet();
return false;
}
}

E poll() {
if (start.get() >= end.get()) {
return null; // no elements to retrieve
if (start.get() + 1 >= end.get() || start.get() + 1 >= capacity) {
return null;
}
int idx = start.getAndIncrement();
if (idx >= end.get()) {
int idx = start.incrementAndGet();
if (idx >= end.get() || idx >= capacity) {
return null; // concurrent polling
}
return array.get(idx);
}

int size() {
int n = endIndicator.get() - start.get();
//noinspection ManualMinMaxCalculation
return n < 0 ? 0 : n;
int start = this.start.get() + 1;
if (start > capacity) {
return 0;
}
int cap = end.get();
if (cap > capacity) {
cap = capacity;
}
return cap - start;
}

void reset() {
end.set(0);
endIndicator.set(0);
start.set(0);
end.set(-1);
start.set(-1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ void utilWriteData(BackendConnectionHandler.ByteFlow flow,
} else {
assert Logger.lowLevelDebug("choose to run without zero copy");

targetConnection.runNoQuickWrite(() -> {
targetConnection.runNoQuickWrite(c -> {
int n = sourceConnection.getInBuffer()
.writeTo(targetConnection.getOutBuffer(), flow.currentSegment.bytesToProxy);
.writeTo(c.getOutBuffer(), flow.currentSegment.bytesToProxy);
flow.currentSegment.bytesToProxy -= n;
assert Logger.lowLevelDebug("proxied " + n + " bytes, still have " + flow.currentSegment.bytesToProxy + " left");
});
Expand All @@ -111,8 +111,8 @@ void utilWriteData(BackendConnectionHandler.ByteFlow flow,
} else {
assert flow.currentSegment.chnl != null;
assert Logger.lowLevelDebug("sending bytes, flow.chnl.used = " + flow.currentSegment.chnl.used());
targetConnection.runNoQuickWrite(() ->
targetConnection.getOutBuffer().storeBytesFrom(flow.currentSegment.chnl));
targetConnection.runNoQuickWrite(c ->
c.getOutBuffer().storeBytesFrom(flow.currentSegment.chnl));
// check whether this batch sending is done
assert Logger.lowLevelDebug("now flow.chnl.used == " + flow.currentSegment.chnl.used());
if (flow.currentSegment.chnl.used() == 0) {
Expand Down
27 changes: 13 additions & 14 deletions extended/src/main/java/io/vproxy/vproxyx/ProxyNexus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.vproxy.pni.array.IntArray;
import io.vproxy.vfd.IPPort;
import io.vproxy.vproxyx.nexus.*;
import io.vproxy.vproxyx.nexus.entity.ConnectInfo;
import io.vproxy.vproxyx.nexus.entity.PeerAddressInfo;
import io.vproxy.vproxyx.uot.FromTcpToUdp;
import io.vproxy.vproxyx.uot.FromUdpToTcp;

Expand Down Expand Up @@ -62,7 +62,7 @@ public static void main0(String[] args) throws Exception {
int serverPort = 0;
String loadPath = null;
var existingConnectTargets = new HashSet<IPPort>();
var connect = new ArrayList<ConnectInfo>();
var connect = new ArrayList<PeerAddressInfo>();
var certificatePath = "";
var privateKeyPath = "";
var cacertPath = "";
Expand Down Expand Up @@ -103,22 +103,22 @@ public static void main0(String[] args) throws Exception {
int uotPort = 0;
if (s.startsWith("uot:")) {
s = s.substring("uot:".length());
}
if (s.contains(":")) {
var uotPortStr = s.substring(s.indexOf(":"));
if (!Utils.isPortInteger(uotPortStr)) {
throw new IllegalArgumentException(uotPortStr + " is not a valid port");
if (s.contains(":")) {
var uotPortStr = s.substring(0, s.indexOf(":"));
if (!Utils.isPortInteger(uotPortStr)) {
throw new IllegalArgumentException(uotPortStr + " is not a valid port");
}
uotPort = Integer.parseInt(uotPortStr);
s = s.substring(s.indexOf(":") + 1);
} else {
throw new IllegalArgumentException("uot listening port must be specified by `uot:<port>:<...>`");
}
uotPort = Integer.parseInt(uotPortStr);
s = s.substring(s.indexOf(":") + 1);
} else {
throw new IllegalArgumentException("uot listening port must be specified by `uot:<port>:<...>`");
}
if (!IPPort.validL4AddrStr(s)) {
throw new IllegalArgumentException(s + " is not valid ipport in `connect`");
}
var ipport = new IPPort(s);
var info = new ConnectInfo(ipport, uotPort);
var info = new PeerAddressInfo(ipport, uotPort);
if (existingConnectTargets.contains(ipport)) {
throw new IllegalArgumentException(s + " is already specified in `connect`");
}
Expand Down Expand Up @@ -244,10 +244,9 @@ public static void main0(String[] args) throws Exception {
new FromUdpToTcp(loop, uotIPPort, ipport).start();
Logger.alert("uot udp->tcp server listens on " + uotIPPort.formatToIPPortString() +
", will redirect traffic to " + ipport.formatToIPPortString());
ipport = uotIPPort;
}

var peer = NexusPeer.create(nctx, ipport);
var peer = NexusPeer.create(nctx, target);
peer.start();
}

Expand Down
Loading

0 comments on commit 8da83e3

Please sign in to comment.