Skip to content

Commit

Permalink
Add StreamUpgrader to relayed streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed Nov 24, 2023
1 parent dc3ae61 commit 6f4834c
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 20 deletions.
5 changes: 1 addition & 4 deletions src/main/java/org/peergos/EmbeddedIpfs.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,20 +199,17 @@ public static EmbeddedIpfs build(RecordStore records,
Multihash ourPeerId = Multihash.deserialize(builder.getPeerId().getBytes());

Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records, blockstore), false);
CircuitStopProtocol.Binding stop = new CircuitStopProtocol.Binding();
CircuitHopProtocol.RelayManager relayManager = CircuitHopProtocol.RelayManager.limitTo(builder.getPrivateKey(), ourPeerId, 5);
Bitswap bitswap = new Bitswap(new BitswapEngine(blockstore, authoriser, Bitswap.MAX_MESSAGE_SIZE, true));
Optional<HttpProtocol.Binding> httpHandler = handler.map(HttpProtocol.Binding::new);

List<ProtocolBinding> protocols = new ArrayList<>();
protocols.add(new Ping());
protocols.add(new AutonatProtocol.Binding());
protocols.add(new CircuitHopProtocol.Binding(relayManager, stop));
protocols.add(bitswap);
protocols.add(dht);
httpHandler.ifPresent(protocols::add);

Host node = builder.addProtocols(protocols).build();
Host node = builder.addProtocols(protocols).enableRelay().build();

Optional<BlockingDeque<Cid>> newBlockProvider = provideBlocks ?
Optional.of(((ProvidingBlockstore)blockstore).toPublish) :
Expand Down
52 changes: 48 additions & 4 deletions src/main/java/org/peergos/HostBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,32 @@
import io.ipfs.multiaddr.*;
import io.ipfs.multihash.Multihash;
import io.libp2p.core.*;
import io.libp2p.core.Stream;
import io.libp2p.core.crypto.*;
import io.libp2p.core.dsl.*;
import io.libp2p.core.multiformats.*;
import io.libp2p.core.multistream.*;
import io.libp2p.core.mux.*;
import io.libp2p.core.security.*;
import io.libp2p.crypto.keys.*;
import io.libp2p.etc.types.*;
import io.libp2p.etc.util.netty.*;
import io.libp2p.multistream.*;
import io.libp2p.protocol.*;
import io.libp2p.security.noise.*;
import io.libp2p.security.tls.*;
import io.libp2p.transport.*;
import io.libp2p.transport.implementation.*;
import io.libp2p.transport.tcp.*;
import io.libp2p.core.crypto.KeyKt;
import org.peergos.blockstore.*;
import org.peergos.protocol.autonat.*;
import org.peergos.protocol.bitswap.*;
import org.peergos.protocol.circuit.*;
import org.peergos.protocol.dht.*;

import java.time.*;
import java.time.temporal.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
Expand Down Expand Up @@ -113,6 +122,44 @@ public static HostBuilder create(int listenPort,
return create(listenPort, providers, records, blocks, authoriser, false);
}

private void upgradeConnection(Stream s, Multihash peerid, int durationSeconds, long limitBytes) {
s.pushHandler(new InboundTrafficLimitHandler(limitBytes));
s.pushHandler(new TotalTimeoutHandler(Duration.of(durationSeconds, ChronoUnit.SECONDS)));

List<? extends ProtocolBinding<?>> bindings = protocols.stream()
.map(b -> (ProtocolBinding<?>)b)
.collect(Collectors.toList());
List<StreamMuxer> theMuxers = muxers.stream()
.map(m -> m.createMuxer(new MultistreamProtocolDebugV1(), bindings))
.collect(Collectors.toList());
List<SecureChannel> sec = List.of(new NoiseXXSecureChannel(privKey, theMuxers));
ConnectionUpgrader upgrader = new ConnectionUpgrader(
new MultistreamProtocolDebugV1(), sec,
new MultistreamProtocolDebugV1(), theMuxers);
ConnectionHandler handler = null;//new MultistreamProtocolDebugV1().createMultistream(bindings).toStreamHandler();
ConnectionBuilder connBuilder = new ConnectionBuilder(null, upgrader, handler,
false, PeerId.fromBase58(peerid.toBase58()), null);

s.getConnection().pushHandler(connBuilder);
connBuilder.getConnectionEstablished().thenApply(c -> c.muxerSession().createStream(bindings));

/* upgrader.establishSecureChannel(conn).thenCompose(sess -> {
// conn.setSecureSession(sess);
if (sess.getEarlyMuxer() != null) {
return upgrader.establishMuxer(sess.getEarlyMuxer(), conn);
} else
return upgrader.establishMuxer(conn);
});*/
}

public HostBuilder enableRelay() {
CircuitStopProtocol.Binding stop = new CircuitStopProtocol.Binding(this::upgradeConnection);
Multihash ourPeerId = Multihash.deserialize(peerId.getBytes());
CircuitHopProtocol.RelayManager relayManager = CircuitHopProtocol.RelayManager.limitTo(privKey, ourPeerId, 5);
addProtocols(List.of(stop, new CircuitHopProtocol.Binding(relayManager, stop)));
return this;
}

public static HostBuilder create(int listenPort,
ProviderStore providers,
RecordStore records,
Expand All @@ -124,13 +171,10 @@ public static HostBuilder create(int listenPort,
.listen(List.of(new MultiAddress("/ip4/0.0.0.0/tcp/" + listenPort)));
Multihash ourPeerId = Multihash.deserialize(builder.peerId.getBytes());
Kademlia dht = new Kademlia(new KademliaEngine(ourPeerId, providers, records, blocks), false);
CircuitStopProtocol.Binding stop = new CircuitStopProtocol.Binding();
CircuitHopProtocol.RelayManager relayManager = CircuitHopProtocol.RelayManager.limitTo(builder.privKey, ourPeerId, 5);

return builder.addProtocols(List.of(
new Ping(),
new AutonatProtocol.Binding(),
stop,
new CircuitHopProtocol.Binding(relayManager, stop),
new Bitswap(new BitswapEngine(blocks, authoriser, Bitswap.MAX_MESSAGE_SIZE, blockAggressivePeers)),
dht));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
public class CircuitStopProtocol extends ProtobufProtocolHandler<CircuitStopProtocol.StopController> {

public static class Binding extends StrictProtocolBinding<CircuitStopProtocol.StopController> {
public Binding() {
super("/libp2p/circuit/relay/0.2.0/stop", new CircuitStopProtocol());
public Binding(StreamUpgrader upgrader) {
super("/libp2p/circuit/relay/0.2.0/stop", new CircuitStopProtocol(upgrader));
}
}

Expand Down Expand Up @@ -63,9 +63,11 @@ public Stream getStream() {

public static class Receiver implements ProtocolMessageHandler<Circuit.StopMessage>, StopController {
private final Stream stream;
private final StreamUpgrader upgrader;

public Receiver(Stream stream) {
public Receiver(Stream stream, StreamUpgrader upgrader) {
this.stream = stream;
this.upgrader = upgrader;
}

@Override
Expand All @@ -77,8 +79,9 @@ public void onMessage(@NotNull Stream stream, Circuit.StopMessage msg) {
stream.writeAndFlush(Circuit.StopMessage.newBuilder()
.setType(Circuit.StopMessage.Type.STATUS).setStatus(Circuit.Status.OK)
.build());
// TODO: now upgrade connection with security and muxer protocol

// now upgrade connection with security and muxer protocol
System.out.println("Upgrading relayed incoming connection..");
upgrader.upgrade(stream, targetPeerId, durationSeconds, limitBytes);
}
}

Expand All @@ -93,8 +96,11 @@ public CompletableFuture<Circuit.StopMessage> rpc(Circuit.StopMessage msg) {

private static final int TRAFFIC_LIMIT = 2*1024;

public CircuitStopProtocol() {
private final StreamUpgrader upgrader;

public CircuitStopProtocol(StreamUpgrader upgrader) {
super(Circuit.StopMessage.getDefaultInstance(), TRAFFIC_LIMIT, TRAFFIC_LIMIT);
this.upgrader = upgrader;
}

@NotNull
Expand All @@ -108,8 +114,7 @@ protected CompletableFuture<StopController> onStartInitiator(@NotNull Stream str
@NotNull
@Override
protected CompletableFuture<StopController> onStartResponder(@NotNull Stream stream) {
System.out.println("circuit.stop::onStartResponder");
Receiver acceptor = new Receiver(stream);
Receiver acceptor = new Receiver(stream, upgrader);
stream.pushHandler(acceptor);
return CompletableFuture.completedFuture(acceptor);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.peergos.protocol.circuit;

import io.ipfs.multihash.*;
import io.libp2p.core.*;

public interface StreamUpgrader {
void upgrade(Stream s, Multihash targetPeerid, int durationSeconds, long limitBytes);
}
19 changes: 15 additions & 4 deletions src/test/java/org/peergos/RelayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.libp2p.core.*;
import io.libp2p.core.Stream;
import io.libp2p.core.multiformats.*;
import io.libp2p.multistream.*;
import io.libp2p.protocol.*;
import org.junit.*;
import org.peergos.blockstore.*;
import org.peergos.protocol.*;
Expand Down Expand Up @@ -56,21 +58,27 @@ public void remoteRelay() {
}

@Test
public void localRelay() {
public void localRelay() throws Exception {
HostBuilder builder1 = HostBuilder.create(10000 + new Random().nextInt(50000),
new RamProviderStore(), new RamRecordStore(), new RamBlockstore(), (c, b, p, a) -> CompletableFuture.completedFuture(true));
new RamProviderStore(10_000), new RamRecordStore(), new RamBlockstore(),
(c, p, a) -> CompletableFuture.completedFuture(true))
.enableRelay();
Host sender = builder1.build();
sender.start().join();
IdentifyBuilder.addIdentifyProtocol(sender);

HostBuilder builder2 = HostBuilder.create(10000 + new Random().nextInt(50000),
new RamProviderStore(), new RamRecordStore(), new RamBlockstore(), (c, b, p, a) -> CompletableFuture.completedFuture(true));
new RamProviderStore(10_000), new RamRecordStore(), new RamBlockstore(),
(c, p, a) -> CompletableFuture.completedFuture(true))
.enableRelay();
Host receiver = builder2.build();
receiver.start().join();
IdentifyBuilder.addIdentifyProtocol(receiver);

HostBuilder relayBuilder = HostBuilder.create(10000 + new Random().nextInt(50000),
new RamProviderStore(), new RamRecordStore(), new RamBlockstore(), (c, b, p, a) -> CompletableFuture.completedFuture(true));
new RamProviderStore(10_000), new RamRecordStore(), new RamBlockstore(),
(c, p, a) -> CompletableFuture.completedFuture(true))
.enableRelay();
Host relay = relayBuilder.build();
relay.start().join();
IdentifyBuilder.addIdentifyProtocol(relay);
Expand All @@ -85,6 +93,9 @@ public void localRelay() {
System.out.println("Using relay " + relay.getPeerId());
CircuitHopProtocol.HopController node1Hop = builder1.getRelayHop().get().dial(sender, relayAddr).getController().join();
Stream stream = node1Hop.connect(Multihash.deserialize(receiver.getPeerId().getBytes())).join();
StreamHandler<PingController> pingHandler = new MultistreamProtocolDebugV1().createMultistream(List.of(new Ping())).toStreamHandler();
pingHandler.handleStream(stream);
Thread.sleep(10_000);
System.out.println();
} finally {
sender.stop();
Expand Down

0 comments on commit 6f4834c

Please sign in to comment.