diff --git a/src/main/java/org/peergos/EmbeddedIpfs.java b/src/main/java/org/peergos/EmbeddedIpfs.java index e0cb0541..276b8232 100644 --- a/src/main/java/org/peergos/EmbeddedIpfs.java +++ b/src/main/java/org/peergos/EmbeddedIpfs.java @@ -4,6 +4,7 @@ import io.ipfs.multiaddr.*; import io.ipfs.multihash.Multihash; import io.libp2p.core.*; +import io.libp2p.core.crypto.*; import io.libp2p.core.multiformats.*; import io.libp2p.core.multistream.*; import io.libp2p.protocol.*; @@ -22,12 +23,14 @@ import org.peergos.protocol.circuit.*; import org.peergos.protocol.dht.*; import org.peergos.protocol.http.*; +import org.peergos.protocol.ipns.*; import org.peergos.util.Logging; import java.nio.file.*; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.time.*; import java.util.*; import java.util.concurrent.*; import java.util.logging.*; @@ -98,6 +101,27 @@ public List getBlocks(List wants, Set peers, boolean .collect(Collectors.toList()); } + public CompletableFuture publishValue(PrivKey priv, byte[] value, long sequence, int hoursTtl) { + Multihash pub = Multihash.deserialize(PeerId.fromPubKey(priv.publicKey()).getBytes()); + LocalDateTime expiry = LocalDateTime.now().plusHours(hoursTtl); + long ttlNanos = hoursTtl * 3600_000_000_000L; + byte[] signedRecord = IPNS.createSignedRecord(value, expiry, sequence, ttlNanos, priv); + return dht.publishValue(pub, signedRecord, node); + } + + public CompletableFuture publishPresignedRecord(Multihash pub, byte[] presignedRecord) { + return dht.publishValue(pub, presignedRecord, node); + } + + public CompletableFuture resolveValue(PubKey pub, int minResults) { + Multihash publisher = Multihash.deserialize(PeerId.fromPubKey(pub).getBytes()); + List candidates = dht.resolveValue(publisher, minResults, node); + List records = candidates.stream().sorted().collect(Collectors.toList()); + if (records.isEmpty()) + return CompletableFuture.failedFuture(new IllegalStateException("Couldn't resolve IPNS value for " + pub)); + return CompletableFuture.completedFuture(records.get(records.size() - 1).value); + } + public void start() { LOG.info("Starting IPFS..."); Thread shutdownHook = new Thread(() -> { diff --git a/src/main/java/org/peergos/protocol/dht/DatabaseRecordStore.java b/src/main/java/org/peergos/protocol/dht/DatabaseRecordStore.java index b5e65930..9c4f43d9 100644 --- a/src/main/java/org/peergos/protocol/dht/DatabaseRecordStore.java +++ b/src/main/java/org/peergos/protocol/dht/DatabaseRecordStore.java @@ -10,7 +10,7 @@ import java.sql.*; import java.time.LocalDateTime; import java.time.ZoneOffset; -import java.util.Optional; +import java.util.*; public class DatabaseRecordStore implements RecordStore { @@ -61,7 +61,7 @@ private String hashToKey(Multihash hash) { } @Override - public Optional get(Cid peerId) { + public Optional get(Multihash peerId) { String selectSQL = "SELECT raw, sequence, ttlNanos, expiryUTC, val FROM " + RECORD_TABLE + " WHERE peerId=?"; try (PreparedStatement pstmt = connection.prepareStatement(selectSQL)) { pstmt.setString(1, hashToKey(peerId)); @@ -76,7 +76,7 @@ public Optional get(Cid peerId) { LocalDateTime expiry = LocalDateTime.ofEpochSecond(rs.getLong("expiryUTC"), 0, ZoneOffset.UTC); IpnsRecord record = new IpnsRecord(bout.toByteArray(), rs.getLong("sequence"), - rs.getLong("ttlNanos"), expiry, rs.getString("val")); + rs.getLong("ttlNanos"), expiry, rs.getString("val").getBytes()); return Optional.of(record); } catch (IOException readEx) { throw new IllegalStateException(readEx); @@ -102,8 +102,8 @@ public void put(Multihash peerId, IpnsRecord record) { pstmt.setLong(3, record.sequence); pstmt.setLong(4, record.ttlNanos); pstmt.setLong(5, record.expiry.toEpochSecond(ZoneOffset.UTC)); - pstmt.setString(6, record.value.length() > SIZE_OF_VAL ? - record.value.substring(0, SIZE_OF_VAL) : record.value); + pstmt.setString(6, new String(record.value.length > SIZE_OF_VAL ? + Arrays.copyOfRange(record.value, 0, SIZE_OF_VAL) : record.value)); pstmt.executeUpdate(); } catch (SQLException ex) { throw new IllegalStateException(ex); diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index 47e13716..6c48e221 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -55,18 +55,17 @@ public int bootstrapRoutingTable(Host host, List addrs, Predicate< }) .filter(filter) .collect(Collectors.toList()); - List> futures = resolved.stream() - .parallel() + List> futures = resolved.stream() .map(addr -> { Multiaddr addrWithPeer = Multiaddr.fromString(addr); addressBook.setAddrs(addrWithPeer.getPeerId(), 0, addrWithPeer); - return dial(host, addrWithPeer).getController(); + return ioExec.submit(() -> dial(host, addrWithPeer).getController().join()); }) .collect(Collectors.toList()); int successes = 0; - for (CompletableFuture future : futures) { + for (Future future : futures) { try { - future.orTimeout(5, TimeUnit.SECONDS).join(); + future.get(5, TimeUnit.SECONDS); successes++; } catch (Exception e) {} } @@ -133,17 +132,24 @@ private int compareKeys(RoutingEntry a, RoutingEntry b, Id keyId) { return a.addresses.peerId.toBase58().compareTo(b.addresses.peerId.toBase58()); } + private final ExecutorService ioExec = Executors.newFixedThreadPool(16); + public List findClosestPeers(Multihash peerIdkey, int maxCount, Host us) { + if (maxCount == 1) { + Collection existing = addressBook.get(PeerId.fromBase58(peerIdkey.toBase58())).join(); + if (!existing.isEmpty()) + return Collections.singletonList(new PeerAddresses(peerIdkey, new ArrayList<>(existing))); + } byte[] key = peerIdkey.toBytes(); + return findClosestPeers(key, maxCount, us); + } + public List findClosestPeers(byte[] key, int maxCount, Host us) { Id keyId = Id.create(Hash.sha256(key), 256); SortedSet closest = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId))); SortedSet toQuery = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId))); List localClosest = engine.getKClosestPeers(key); if (maxCount == 1) { - Collection existing = addressBook.get(PeerId.fromBase58(peerIdkey.toBase58())).join(); - if (! existing.isEmpty()) - return Collections.singletonList(new PeerAddresses(peerIdkey, new ArrayList<>(existing))); - Optional match = localClosest.stream().filter(p -> p.peerId.equals(peerIdkey)).findFirst(); + Optional match = localClosest.stream().filter(p -> Arrays.equals(p.peerId.toBytes(), key)).findFirst(); if (match.isPresent()) return Collections.singletonList(match.get()); } @@ -154,23 +160,24 @@ public List findClosestPeers(Multihash peerIdkey, int maxCount, H Set queried = Collections.synchronizedSet(new HashSet<>()); int queryParallelism = 3; while (true) { - List>> futures = toQuery.stream() + List thisRound = toQuery.stream() .limit(queryParallelism) - .parallel() + .collect(Collectors.toList()); + List>> futures = thisRound.stream() .map(r -> { toQuery.remove(r); queried.add(r.addresses.peerId); - return getCloserPeers(peerIdkey, r.addresses, us); + return ioExec.submit(() -> getCloserPeers(key, r.addresses, us).join()); }) .collect(Collectors.toList()); boolean foundCloser = false; - for (CompletableFuture> future : futures) { + for (Future> future : futures) { try { - List result = future.join(); + List result = future.get(); for (PeerAddresses peer : result) { if (!queried.contains(peer.peerId)) { // exit early if we are looking for the specific node - if (maxCount == 1 && peer.peerId.equals(peerIdkey)) + if (maxCount == 1 && Arrays.equals(peer.peerId.toBytes(), key)) return Collections.singletonList(peer); queried.add(peer.peerId); Id peerKey = Id.create(Hash.sha256(peer.peerId.toBytes()), 256); @@ -250,17 +257,17 @@ public CompletableFuture> findProviders(Multihash block, Hos return CompletableFuture.completedFuture(providers); } - private CompletableFuture> getCloserPeers(Multihash peerIDKey, PeerAddresses target, Host us) { + private CompletableFuture> getCloserPeers(byte[] key, PeerAddresses target, Host us) { try { - return dialPeer(target, us).orTimeout(2, TimeUnit.SECONDS).join().closerPeers(peerIDKey); + return dialPeer(target, us).orTimeout(2, TimeUnit.SECONDS).join().closerPeers(key); } catch (Exception e) { // we can't dial quic only nodes until it's implemented if (target.addresses.stream().allMatch(a -> a.toString().contains("quic"))) return CompletableFuture.completedFuture(Collections.emptyList()); if (e.getCause() instanceof NothingToCompleteException || e.getCause() instanceof NonCompleteException) { - LOG.fine("Couldn't dial " + peerIDKey + " addrs: " + target.addresses); + LOG.fine("Couldn't dial " + target.peerId + " addrs: " + target.addresses); } else if (e.getCause() instanceof TimeoutException) - LOG.fine("Timeout dialing " + peerIDKey + " addrs: " + target.addresses); + LOG.fine("Timeout dialing " + target.peerId + " addrs: " + target.addresses); else if (e.getCause() instanceof ConnectionClosedException) {} else e.printStackTrace(); @@ -297,49 +304,180 @@ public CompletableFuture provideBlock(Multihash block, Host us, PeerAddres return CompletableFuture.allOf(provides.toArray(new CompletableFuture[0])); } - public CompletableFuture publishIpnsValue(PrivKey priv, Multihash publisher, Multihash value, long sequence, Host us) { + public CompletableFuture publishIpnsValue(PrivKey priv, + Multihash publisher, + Multihash value, + long sequence, + Host us) { int hours = 1; LocalDateTime expiry = LocalDateTime.now().plusHours(hours); - long ttl = hours * 3600_000_000_000L; + long ttlNanos = hours * 3600_000_000_000L; + byte[] publishValue = ("/ipfs/" + value).getBytes(); + byte[] signedRecord = IPNS.createSignedRecord(publishValue, expiry, sequence, ttlNanos, priv); + return publishValue(publisher, signedRecord, us); + } + + private boolean putValue(Multihash publisher, + byte[] signedRecord, + PeerAddresses peer, + Host us) { + try { + return dialPeer(peer, us).join() + .putValue(publisher, signedRecord).join(); + } catch (Exception e) {} + return false; + } + + public CompletableFuture publishValue(Multihash publisher, + byte[] signedRecord, + Host us) { + byte[] key = IPNS.getKey(publisher); + Optional parsed = IPNS.parseAndValidateIpnsEntry(key, signedRecord); + if (parsed.isEmpty() || !parsed.get().publisher.equals(publisher)) + throw new IllegalStateException("Tried to publish invalid INS record for " + publisher); + Optional existing = engine.getRecord(publisher); + // don't overwrite 'newer' record + if (existing.isEmpty() || parsed.get().value.compareTo(existing.get()) > 0) { + engine.addRecord(publisher, parsed.get().value); + } + + Set publishes = Collections.synchronizedSet(new HashSet<>()); + int minPublishes = 20; - int publishes = 0; - for (int i=0; i < 5 && publishes < 20; i++) { - List closestPeers = findClosestPeers(publisher, 25, us); - publishes += closestPeers.stream().parallel().mapToInt(peer -> { + Id keyId = Id.create(Hash.sha256(key), 256); + SortedSet toQuery = new TreeSet<>((a, b) -> compareKeys(a, b, keyId)); + List localClosest = engine.getKClosestPeers(key); + int queryParallelism = 3; + toQuery.addAll(localClosest.stream() + .limit(queryParallelism) + .map(p -> new RoutingEntry(Id.create(Hash.sha256(p.peerId.toBytes()), 256), p)) + .collect(Collectors.toList())); + Set queried = Collections.synchronizedSet(new HashSet<>()); + while (! toQuery.isEmpty()) { + int remaining = toQuery.size() - 3; + List thisRound = toQuery.stream() + .limit(queryParallelism) + .collect(Collectors.toList()); + List>> futures = thisRound.stream() + .map(r -> { + toQuery.remove(r); + queried.add(r.addresses.peerId); + return ioExec.submit(() -> getCloserPeers(key, r.addresses, us).thenApply(res -> { + List more = new ArrayList<>(); + for (PeerAddresses peer : res) { + if (! queried.contains(peer.peerId)) { + Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256); + RoutingEntry e = new RoutingEntry(peerKey, peer); + more.add(e); + } + } + ioExec.submit(() -> { + if (putValue(publisher, signedRecord, r.addresses, us)) + publishes.add(r.addresses.peerId); + }); + return more; + }).join()); + }) + .collect(Collectors.toList()); + futures.forEach(f -> { try { - boolean success = dialPeer(peer, us).join() - .putValue("/ipfs/" + value, expiry, sequence, - ttl, publisher, priv).join(); - if (success) - return 1; + toQuery.addAll(f.get()); } catch (Exception e) {} - return 0; - }).sum(); + }); + // exit early if we have enough results + if (publishes.size() >= minPublishes) + break; + if (toQuery.size() == remaining) { + // publish to closest remaining nodes + while (publishes.size() < minPublishes) { + List closest = toQuery.stream() + .limit(minPublishes - publishes.size() + 5) + .collect(Collectors.toList()); + List> lastFutures = closest.stream() + .map(r -> { + toQuery.remove(r); + queried.add(r.addresses.peerId); + return ioExec.submit(() -> { + if (putValue(publisher, signedRecord, r.addresses, us)) + publishes.add(r.addresses.peerId); + }); + }) + .collect(Collectors.toList()); + lastFutures.forEach(f -> { + try { + f.get(); + } catch (Exception e) {} + }); + } + break; + } } return CompletableFuture.completedFuture(null); } public CompletableFuture resolveIpnsValue(Multihash publisher, Host us, int minResults) { - List closestPeers = findClosestPeers(publisher, 20, us); + List candidates = resolveValue(publisher, minResults, us); + List records = candidates.stream().sorted().collect(Collectors.toList()); + if (records.isEmpty()) + return CompletableFuture.failedFuture(new IllegalStateException("Couldn't find IPNS value for " + publisher)); + return CompletableFuture.completedFuture(new String(records.get(records.size() - 1).value)); + } + + private Optional getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) { + try { + return Optional.of(dialPeer(peer, us).join().getValue(publisher).join()); + } catch (Exception e) {} + return Optional.empty(); + } + public List resolveValue(Multihash publisher, int minResults, Host us) { + byte[] key = IPNS.getKey(publisher); List candidates = new ArrayList<>(); - Set queryCandidates = new HashSet<>(); - Set queriedPeers = new HashSet<>(); - for (PeerAddresses peer : closestPeers) { - if (queriedPeers.contains(peer.peerId)) - continue; - queriedPeers.add(peer.peerId); - try { - GetResult res = dialPeer(peer, us).join().getValue(publisher).join(); - if (res.record.isPresent() && res.record.get().publisher.equals(publisher)) - candidates.add(res.record.get().value); - queryCandidates.addAll(res.closerPeers); - } catch (Exception e) {} + Optional local = engine.getRecord(publisher); + local.ifPresent(candidates::add); + + Id keyId = Id.create(Hash.sha256(key), 256); + SortedSet toQuery = Collections.synchronizedSortedSet(new TreeSet<>((a, b) -> compareKeys(a, b, keyId))); + List localClosest = engine.getKClosestPeers(key); + int queryParallelism = 3; + toQuery.addAll(localClosest.stream() + .limit(queryParallelism) + .map(p -> new RoutingEntry(Id.create(Hash.sha256(p.peerId.toBytes()), 256), p)) + .collect(Collectors.toList())); + Set queried = Collections.synchronizedSet(new HashSet<>()); + while (! toQuery.isEmpty()) { + int remaining = toQuery.size() - 3; + List thisRound = toQuery.stream() + .limit(queryParallelism) + .collect(Collectors.toList()); + List> futures = thisRound.stream() + .map(r -> { + toQuery.remove(r); + queried.add(r.addresses.peerId); + return ioExec.submit(() -> getValueFromPeer(r.addresses, publisher, us) + .ifPresent(g -> { + if (g.record.isPresent() && g.record.get().publisher.equals(publisher)) + candidates.add(g.record.get().value); + for (PeerAddresses peer : g.closerPeers) { + if (! queried.contains(peer.peerId)) { + Id peerKey = Id.create(Hash.sha256(IPNS.getKey(peer.peerId)), 256); + RoutingEntry e = new RoutingEntry(peerKey, peer); + toQuery.add(e); + } + } + })); + }) + .collect(Collectors.toList()); + futures.forEach(f -> { + try { + f.get(); + } catch (Exception e) {} + }); + // exit early if we have enough results if (candidates.size() >= minResults) break; + if (toQuery.size() == remaining) + break; } - - // Validate and sort records by sequence number - List records = candidates.stream().sorted().collect(Collectors.toList()); - return CompletableFuture.completedFuture(records.get(records.size() - 1).value); + return candidates; } } diff --git a/src/main/java/org/peergos/protocol/dht/KademliaController.java b/src/main/java/org/peergos/protocol/dht/KademliaController.java index da17a78f..98e4fbb0 100644 --- a/src/main/java/org/peergos/protocol/dht/KademliaController.java +++ b/src/main/java/org/peergos/protocol/dht/KademliaController.java @@ -20,10 +20,10 @@ public interface KademliaController { CompletableFuture send(Dht.Message msg); - default CompletableFuture> closerPeers(Multihash peerID) { + default CompletableFuture> closerPeers(byte[] key) { return rpc(Dht.Message.newBuilder() .setType(Dht.Message.MessageType.FIND_NODE) - .setKey(ByteString.copyFrom(peerID.toBytes())) + .setKey(ByteString.copyFrom(key)) .build()) .thenApply(resp -> resp.getCloserPeersList().stream() .map(PeerAddresses::fromProtobuf) @@ -47,31 +47,6 @@ default CompletableFuture getProviders(Multihash block) { .thenApply(Providers::fromProtobuf); } - default CompletableFuture putValue(String pathToPublish, LocalDateTime expiry, long sequence, - long ttlNanos, Multihash peerId, PrivKey ourKey) { - byte[] cborEntryData = IPNS.createCborDataForIpnsEntry(pathToPublish, expiry, - Ipns.IpnsEntry.ValidityType.EOL_VALUE, sequence, ttlNanos); - String expiryString = IPNS.formatExpiry(expiry); - byte[] signature = ourKey.sign(IPNS.createSigV2Data(cborEntryData)); - PubKey pubKey = ourKey.publicKey(); - byte[] pubKeyProtobuf = Crypto.PublicKey.newBuilder() - .setType(pubKey.getKeyType()) - .setData(ByteString.copyFrom(pubKey.raw())) - .build() - .toByteArray(); - byte[] ipnsEntry = Ipns.IpnsEntry.newBuilder() - .setSequence(sequence) - .setTtl(ttlNanos) - .setValue(ByteString.copyFrom(pathToPublish.getBytes())) - .setValidityType(Ipns.IpnsEntry.ValidityType.EOL) - .setValidity(ByteString.copyFrom(expiryString.getBytes())) - .setData(ByteString.copyFrom(cborEntryData)) - .setSignatureV2(ByteString.copyFrom(signature)) - .setPubKey(ByteString.copyFrom(pubKeyProtobuf)) // not needed with Ed25519 - .build().toByteArray(); - return putValue(peerId, ipnsEntry); - } - default CompletableFuture putValue(Multihash peerId, byte[] value) { byte[] ipnsRecordKey = IPNS.getKey(peerId); Dht.Message outgoing = Dht.Message.newBuilder() diff --git a/src/main/java/org/peergos/protocol/dht/KademliaEngine.java b/src/main/java/org/peergos/protocol/dht/KademliaEngine.java index eaba4464..4c16c448 100644 --- a/src/main/java/org/peergos/protocol/dht/KademliaEngine.java +++ b/src/main/java/org/peergos/protocol/dht/KademliaEngine.java @@ -3,7 +3,6 @@ import com.google.protobuf.*; import com.offbynull.kademlia.*; import io.ipfs.cid.*; -import io.ipfs.multiaddr.*; import io.ipfs.multihash.Multihash; import io.libp2p.core.*; import io.libp2p.core.Stream; @@ -97,12 +96,25 @@ public List getKClosestPeers(byte[] key) { .collect(Collectors.toList()); } + public void addRecord(Multihash publisher, IpnsRecord record) { + ipnsStore.put(publisher, record); + } + + public Optional getRecord(Multihash publisher) { + return ipnsStore.get(publisher); + } + public void receiveRequest(Dht.Message msg, PeerId source, Stream stream) { responderReceivedBytes.inc(msg.getSerializedSize()); switch (msg.getType()) { case PUT_VALUE: { - Optional mapping = IPNS.validateIpnsEntry(msg); + Optional mapping = IPNS.parseAndValidateIpnsEntry(msg); if (mapping.isPresent()) { + Optional existing = ipnsStore.get(mapping.get().publisher); + if (existing.isPresent() && mapping.get().value.compareTo(existing.get()) < 0) { + // don't add 'older' record + return; + } ipnsStore.put(mapping.get().publisher, mapping.get().value); stream.writeAndFlush(msg); responderSentBytes.inc(msg.getSerializedSize()); diff --git a/src/main/java/org/peergos/protocol/dht/RamRecordStore.java b/src/main/java/org/peergos/protocol/dht/RamRecordStore.java index 7ead53c2..10d78190 100644 --- a/src/main/java/org/peergos/protocol/dht/RamRecordStore.java +++ b/src/main/java/org/peergos/protocol/dht/RamRecordStore.java @@ -1,6 +1,5 @@ package org.peergos.protocol.dht; -import io.ipfs.cid.*; import io.ipfs.multihash.*; import org.peergos.protocol.ipns.*; @@ -19,7 +18,7 @@ public void put(Multihash peerId, IpnsRecord record) { } @Override - public Optional get(Cid peerId) { + public Optional get(Multihash peerId) { return Optional.ofNullable(records.get(peerId)); } diff --git a/src/main/java/org/peergos/protocol/dht/RecordStore.java b/src/main/java/org/peergos/protocol/dht/RecordStore.java index 1a1229b9..f400ab0f 100644 --- a/src/main/java/org/peergos/protocol/dht/RecordStore.java +++ b/src/main/java/org/peergos/protocol/dht/RecordStore.java @@ -10,7 +10,7 @@ public interface RecordStore extends AutoCloseable { void put(Multihash peerId, IpnsRecord record); - Optional get(Cid peerId); + Optional get(Multihash peerId); void remove(Multihash peerId); } diff --git a/src/main/java/org/peergos/protocol/ipns/GetResult.java b/src/main/java/org/peergos/protocol/ipns/GetResult.java index 087ec117..709c56c0 100644 --- a/src/main/java/org/peergos/protocol/ipns/GetResult.java +++ b/src/main/java/org/peergos/protocol/ipns/GetResult.java @@ -20,7 +20,7 @@ public static GetResult fromProtobuf(Dht.Message msg) { .map(PeerAddresses::fromProtobuf) .collect(Collectors.toList()); Optional record = msg.hasRecord() ? - IPNS.validateIpnsEntry(msg) : + IPNS.parseAndValidateIpnsEntry(msg) : Optional.empty(); return new GetResult(record, closerPeers); } diff --git a/src/main/java/org/peergos/protocol/ipns/IPNS.java b/src/main/java/org/peergos/protocol/ipns/IPNS.java index 28ea5c13..c65e6ee8 100644 --- a/src/main/java/org/peergos/protocol/ipns/IPNS.java +++ b/src/main/java/org/peergos/protocol/ipns/IPNS.java @@ -34,20 +34,49 @@ public static byte[] getKey(Multihash peerId) { return bout.toByteArray(); } + public static byte[] createSignedRecord(byte[] value, + LocalDateTime expiry, + long sequence, + long ttlNanos, + PrivKey ourKey) { + byte[] cborEntryData = IPNS.createCborDataForIpnsEntry(value, expiry, + Ipns.IpnsEntry.ValidityType.EOL_VALUE, sequence, ttlNanos); + String expiryString = IPNS.formatExpiry(expiry); + byte[] signature = ourKey.sign(IPNS.createSigV2Data(cborEntryData)); + PubKey pubKey = ourKey.publicKey(); + Ipns.IpnsEntry.Builder entryBuilder = Ipns.IpnsEntry.newBuilder() + .setSequence(sequence) + .setTtl(ttlNanos) + .setValue(ByteString.copyFrom(value)) + .setValidityType(Ipns.IpnsEntry.ValidityType.EOL) + .setValidity(ByteString.copyFrom(expiryString.getBytes())) + .setData(ByteString.copyFrom(cborEntryData)) + .setSignatureV2(ByteString.copyFrom(signature)); + if (ourKey.getKeyType() != Crypto.KeyType.Ed25519) { + byte[] pubKeyProtobuf = Crypto.PublicKey.newBuilder() + .setType(pubKey.getKeyType()) + .setData(ByteString.copyFrom(pubKey.raw())) + .build() + .toByteArray(); + entryBuilder = entryBuilder.setPubKey(ByteString.copyFrom(pubKeyProtobuf)); // not needed with Ed25519 + } + return entryBuilder.build().toByteArray(); + } + public static Cid getCidFromKey(ByteString key) { if (! key.startsWith(ByteString.copyFrom("/ipns/".getBytes(StandardCharsets.UTF_8)))) throw new IllegalStateException("Unknown IPNS key space: " + key); return Cid.cast(key.substring(6).toByteArray()); } - public static byte[] createCborDataForIpnsEntry(String pathToPublish, + public static byte[] createCborDataForIpnsEntry(byte[] value, LocalDateTime expiry, long validityType, long sequence, long ttl) { SortedMap state = new TreeMap<>(); state.put("TTL", new CborObject.CborLong(ttl)); - state.put("Value", new CborObject.CborByteArray(pathToPublish.getBytes())); + state.put("Value", new CborObject.CborByteArray(value)); state.put("Sequence", new CborObject.CborLong(sequence)); String expiryString = formatExpiry(expiry); state.put("Validity", new CborObject.CborByteArray(expiryString.getBytes(StandardCharsets.UTF_8))); @@ -66,17 +95,22 @@ public static byte[] createSigV2Data(byte[] data) { } } - public static Optional validateIpnsEntry(Dht.Message msg) { + public static Optional parseAndValidateIpnsEntry(Dht.Message msg) { if (! msg.hasRecord() || msg.getRecord().getValue().size() > IPNS.MAX_RECORD_SIZE) return Optional.empty(); if (! msg.getKey().equals(msg.getRecord().getKey())) return Optional.empty(); - if (! msg.getRecord().getKey().startsWith(ByteString.copyFrom("/ipns/".getBytes(StandardCharsets.UTF_8)))) + byte[] entryBytes = msg.getRecord().getValue().toByteArray(); + return parseAndValidateIpnsEntry(msg.getRecord().getKey().toByteArray(), entryBytes); + } + + public static Optional parseAndValidateIpnsEntry(byte[] key, byte[] entryBytes) { + if (! Arrays.equals(Arrays.copyOfRange(key, 0, 6), "/ipns/".getBytes(StandardCharsets.UTF_8))) return Optional.empty(); - byte[] cidBytes = msg.getRecord().getKey().substring(6).toByteArray(); + byte[] cidBytes = Arrays.copyOfRange(key, 6, key.length); Multihash signer = Multihash.deserialize(cidBytes); try { - Ipns.IpnsEntry entry = Ipns.IpnsEntry.parseFrom(msg.getRecord().getValue()); + Ipns.IpnsEntry entry = Ipns.IpnsEntry.parseFrom(entryBytes); if (! entry.hasSignatureV2() || ! entry.hasData()) return Optional.empty(); PubKey pub; @@ -108,8 +142,7 @@ public static Optional validateIpnsEntry(Dht.Message msg) { LocalDateTime expiry = LocalDateTime.parse(new String(validity).substring(0, validity.length - 1), IPNS.rfc3339nano); if (expiry.isBefore(LocalDateTime.now())) return Optional.empty(); - byte[] entryBytes = msg.getRecord().getValue().toByteArray(); - IpnsRecord record = new IpnsRecord(entryBytes, entry.getSequence(), entry.getTtl(), expiry, entry.getValue().toStringUtf8()); + IpnsRecord record = new IpnsRecord(entryBytes, entry.getSequence(), entry.getTtl(), expiry, entry.getValue().toByteArray()); return Optional.of(new IpnsMapping(signer, record)); } catch (InvalidProtocolBufferException e) { return Optional.empty(); diff --git a/src/main/java/org/peergos/protocol/ipns/IpnsRecord.java b/src/main/java/org/peergos/protocol/ipns/IpnsRecord.java index 44df90e4..5db16e34 100644 --- a/src/main/java/org/peergos/protocol/ipns/IpnsRecord.java +++ b/src/main/java/org/peergos/protocol/ipns/IpnsRecord.java @@ -9,9 +9,9 @@ public class IpnsRecord implements Comparable { public final byte[] raw; public final long sequence, ttlNanos; public final LocalDateTime expiry; - public final String value; + public final byte[] value; - public IpnsRecord(byte[] raw, long sequence, long ttlNanos, LocalDateTime expiry, String value) { + public IpnsRecord(byte[] raw, long sequence, long ttlNanos, LocalDateTime expiry, byte[] value) { this.raw = raw; this.sequence = sequence; this.ttlNanos = ttlNanos; diff --git a/src/test/java/org/peergos/DatabaseRecordStoreTest.java b/src/test/java/org/peergos/DatabaseRecordStoreTest.java index 23fed0da..0bc80d52 100644 --- a/src/test/java/org/peergos/DatabaseRecordStoreTest.java +++ b/src/test/java/org/peergos/DatabaseRecordStoreTest.java @@ -13,7 +13,7 @@ public class DatabaseRecordStoreTest { public void testRecordStore() { try (DatabaseRecordStore bs = new DatabaseRecordStore("mem:")) { LocalDateTime now = LocalDateTime.now(); - IpnsRecord record = new IpnsRecord("raw".getBytes(), 1, 2, now, "value"); + IpnsRecord record = new IpnsRecord("raw".getBytes(), 1, 2, now, "value".getBytes()); Cid peerId = Cid.decode("zb2rhYSxw4ZjuzgCnWSt19Q94ERaeFhu9uSqRgjSdx9bsgM6f"); bs.put(peerId, record); //make sure PUTing a second time succeeds diff --git a/src/test/java/org/peergos/EmbeddedIpfsTest.java b/src/test/java/org/peergos/EmbeddedIpfsTest.java index 8aa4b356..dab8a67d 100644 --- a/src/test/java/org/peergos/EmbeddedIpfsTest.java +++ b/src/test/java/org/peergos/EmbeddedIpfsTest.java @@ -3,15 +3,19 @@ import identify.pb.*; import io.ipfs.cid.*; import io.ipfs.multiaddr.*; +import io.ipfs.multihash.Multihash; import io.libp2p.core.*; import io.libp2p.core.crypto.*; import io.libp2p.core.multiformats.*; +import io.libp2p.crypto.keys.*; import io.libp2p.protocol.*; import org.junit.*; import org.peergos.blockstore.*; import org.peergos.config.*; import org.peergos.protocol.dht.*; +import org.peergos.protocol.ipns.*; +import java.time.*; import java.util.*; import java.util.concurrent.*; import java.util.stream.*; @@ -39,6 +43,62 @@ public void largeBlock() throws Exception { node2.stop(); } + @Test + public void publishValue() throws Exception { + EmbeddedIpfs node1 = build(BootstrapTest.BOOTSTRAP_NODES, List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort()))); + node1.start(); + + PrivKey publisher = Ed25519Kt.generateEd25519KeyPair().getFirst(); + byte[] value = "This is a test".getBytes(); + node1.publishValue(publisher, value, 1, 24).join(); + byte[] res = node1.resolveValue(publisher.publicKey(), 5).join(); + Assert.assertTrue(Arrays.equals(res, value)); + + node1.stop(); + } + + @Test + public void publishPresignedValue() throws Exception { + EmbeddedIpfs node1 = build(BootstrapTest.BOOTSTRAP_NODES, List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort()))); + node1.start(); + + PrivKey publisher = Ed25519Kt.generateEd25519KeyPair().getFirst(); + byte[] value = "This is a test".getBytes(); + io.ipfs.multihash.Multihash pub = Multihash.deserialize(PeerId.fromPubKey(publisher.publicKey()).getBytes()); + long hoursTtl = 24*365; + LocalDateTime expiry = LocalDateTime.now().plusHours(hoursTtl); + long ttlNanos = hoursTtl * 3600_000_000_000L; + byte[] signedRecord = IPNS.createSignedRecord(value, expiry, 1, ttlNanos, publisher); + node1.publishPresignedRecord(pub, signedRecord).join(); + node1.publishPresignedRecord(pub, signedRecord).join(); + node1.publishPresignedRecord(pub, signedRecord).join(); + + byte[] res = node1.resolveValue(publisher.publicKey(), 5).join(); + Assert.assertTrue(Arrays.equals(res, value)); + + // publish an updated value with same expiry + byte[] value2 = "Updated value".getBytes(); + byte[] signedRecord2 = IPNS.createSignedRecord(value2, expiry, 2, ttlNanos, publisher); + node1.publishPresignedRecord(pub, signedRecord2).join(); + node1.publishPresignedRecord(pub, signedRecord2).join(); + node1.publishPresignedRecord(pub, signedRecord2).join(); + + byte[] res2 = node1.resolveValue(publisher.publicKey(), 5).join(); + Assert.assertTrue(Arrays.equals(res2, value2)); + + // publish an updated value with earlier expiry + byte[] value3 = "3rd value to put in IPNS".getBytes(); + byte[] signedRecord3 = IPNS.createSignedRecord(value3, expiry.minusDays(1), 3, ttlNanos, publisher); + node1.publishPresignedRecord(pub, signedRecord3).join(); + node1.publishPresignedRecord(pub, signedRecord3).join(); + node1.publishPresignedRecord(pub, signedRecord3).join(); + + byte[] res3 = node1.resolveValue(publisher.publicKey(), 5).join(); + Assert.assertTrue(Arrays.equals(res3, value3)); + + node1.stop(); + } + @Test public void wildcardListenerAddressesGetExpanded() { int node1Port = TestPorts.getPort(); diff --git a/src/test/java/org/peergos/FindPeerTest.java b/src/test/java/org/peergos/FindPeerTest.java index 91c1610a..51173c18 100644 --- a/src/test/java/org/peergos/FindPeerTest.java +++ b/src/test/java/org/peergos/FindPeerTest.java @@ -49,7 +49,7 @@ private static long findAndDialPeer(Multihash toFind, Kademlia dht1, Host node1) PeerAddresses peer = matching.get(); Multiaddr[] addrs = peer.getPublicAddresses().stream().map(a -> Multiaddr.fromString(a.toString())).toArray(Multiaddr[]::new); dht1.dial(node1, PeerId.fromBase58(peer.peerId.toBase58()), addrs) - .getController().join().closerPeers(toFind).join(); + .getController().join().closerPeers(toFind.toBytes()).join(); System.out.println("Peer lookup took " + (t2-t1) + "ms"); return t2 - t1; } diff --git a/src/test/java/org/peergos/IpnsTest.java b/src/test/java/org/peergos/IpnsTest.java index f917cde6..72d4d3fa 100644 --- a/src/test/java/org/peergos/IpnsTest.java +++ b/src/test/java/org/peergos/IpnsTest.java @@ -2,7 +2,6 @@ import io.ipfs.api.*; import io.ipfs.cid.*; -import io.ipfs.multiaddr.*; import io.ipfs.multihash.Multihash; import io.libp2p.core.*; import io.libp2p.core.multiformats.*; @@ -48,8 +47,9 @@ public void publishIPNSRecordToKubo() throws IOException { for (int i = 0; i < 10; i++) { try { + byte[] value = IPNS.createSignedRecord(pathToPublish.getBytes(), expiry, sequence, ttl, node1.getPrivKey()); success = dht.dial(node1, address2).getController().join() - .putValue(pathToPublish, expiry, sequence, ttl, node1Id, node1.getPrivKey()) + .putValue(node1Id, value) .orTimeout(2, TimeUnit.SECONDS).join(); break; } catch (Exception timeout) { diff --git a/src/test/java/org/peergos/KademliaTest.java b/src/test/java/org/peergos/KademliaTest.java index 96ad00c2..b9517cc2 100644 --- a/src/test/java/org/peergos/KademliaTest.java +++ b/src/test/java/org/peergos/KademliaTest.java @@ -87,7 +87,7 @@ public void ipnsBenchmark() throws Exception { List signers = new ArrayList<>(); long publishTotal = 0, resolveTotal = 0; - int iterations = 10; + int iterations = 25; for (int i = 0; i < iterations; i++) { // publish mapping from node 1 PrivKey signer = Ed25519Kt.generateEd25519KeyPair().getFirst(); @@ -96,7 +96,7 @@ public void ipnsBenchmark() throws Exception { long p0 = System.currentTimeMillis(); dht1.publishIpnsValue(signer, pub, value, 1, node1).join(); long p1 = System.currentTimeMillis(); - System.out.println("Publish took " + (p1-p0) + "ms"); + System.out.println("Publish took " + printSeconds(p1-p0) + "s"); publishTotal += p1-p0; // retrieve it from node 2 @@ -104,7 +104,7 @@ public void ipnsBenchmark() throws Exception { String res = dht2.resolveIpnsValue(pub, node2, 1).orTimeout(10, TimeUnit.SECONDS).join(); long t1 = System.currentTimeMillis(); Assert.assertTrue(res.equals("/ipfs/" + value)); - System.out.println("Resolved in " + (t1 - t0) + "ms"); + System.out.println("Resolved in " + printSeconds(t1 - t0) + "s"); resolveTotal += t1-t0; } System.out.println("Publish av: " + publishTotal/iterations + ", resolve av: " + resolveTotal/iterations); @@ -116,7 +116,7 @@ public void ipnsBenchmark() throws Exception { String res = dht2.resolveIpnsValue(pub, node2, 1).orTimeout(10, TimeUnit.SECONDS).join(); long t1 = System.currentTimeMillis(); Assert.assertTrue(res.equals("/ipfs/" + value)); - System.out.println("Resolved again in " + (t1 - t0) + "ms"); + System.out.println("Resolved again in " + printSeconds(t1 - t0) + "s"); } } finally { node1.stop(); @@ -124,6 +124,10 @@ public void ipnsBenchmark() throws Exception { } } + public static String printSeconds(long millis) { + return millis / 1000 + "." + (millis % 1000)/100; + } + @Test public void kademliaFindNodeLimitTest() { PeerId us = new HostBuilder().generateIdentity().getPeerId();