Skip to content

Commit

Permalink
Generalise ipns publishing and resolution to arbitrary byte[]
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed Dec 11, 2023
1 parent 9cd535d commit aec72ef
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 26 deletions.
17 changes: 17 additions & 0 deletions src/main/java/org/peergos/EmbeddedIpfs.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.ipfs.multiaddr.*;
import io.ipfs.multihash.Multihash;
import io.libp2p.core.*;
import io.libp2p.core.crypto.*;
import io.libp2p.core.multiformats.*;
import io.libp2p.core.multistream.*;
import io.libp2p.protocol.*;
Expand All @@ -28,6 +29,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
Expand Down Expand Up @@ -98,6 +100,21 @@ public List<HashedBlock> getBlocks(List<Want> wants, Set<PeerId> peers, boolean
.collect(Collectors.toList());
}

public CompletableFuture<Void> publishValue(PrivKey priv, byte[] value, long sequence) {
Multihash pub = Multihash.deserialize(PeerId.fromPubKey(priv.publicKey()).getBytes());
int hours = 1;
LocalDateTime expiry = LocalDateTime.now().plusHours(hours);
long ttlNanos = hours * 3600_000_000_000L;
return dht.publishValue(priv, pub, value, sequence, expiry, ttlNanos, node);
}

public CompletableFuture<byte[]> resolveValue(PubKey pub) {
Multihash publisher = Multihash.deserialize(PeerId.fromPubKey(pub).getBytes());
CompletableFuture<byte[]> res = new CompletableFuture<>();
dht.resolveValue(publisher, node, Kademlia.getNRecords(2, res));
return res;
}

public void start() {
LOG.info("Starting IPFS...");
Thread shutdownHook = new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.sql.*;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Optional;
import java.util.*;

public class DatabaseRecordStore implements RecordStore {

Expand Down Expand Up @@ -76,7 +76,7 @@ public Optional<IpnsRecord> get(Cid peerId) {
LocalDateTime expiry = LocalDateTime.ofEpochSecond(rs.getLong("expiryUTC"),
0, ZoneOffset.UTC);
IpnsRecord record = new IpnsRecord(bout.toByteArray(), rs.getLong("sequence"),
rs.getLong("ttlNanos"), expiry, rs.getString("val"));
rs.getLong("ttlNanos"), expiry, rs.getString("val").getBytes());
return Optional.of(record);
} catch (IOException readEx) {
throw new IllegalStateException(readEx);
Expand All @@ -102,8 +102,8 @@ public void put(Multihash peerId, IpnsRecord record) {
pstmt.setLong(3, record.sequence);
pstmt.setLong(4, record.ttlNanos);
pstmt.setLong(5, record.expiry.toEpochSecond(ZoneOffset.UTC));
pstmt.setString(6, record.value.length() > SIZE_OF_VAL ?
record.value.substring(0, SIZE_OF_VAL) : record.value);
pstmt.setString(6, new String(record.value.length > SIZE_OF_VAL ?
Arrays.copyOfRange(record.value, 0, SIZE_OF_VAL) : record.value));
pstmt.executeUpdate();
} catch (SQLException ex) {
throw new IllegalStateException(ex);
Expand Down
52 changes: 40 additions & 12 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,33 @@ public CompletableFuture<Void> provideBlock(Multihash block, Host us, PeerAddres
return CompletableFuture.allOf(provides.toArray(new CompletableFuture[0]));
}

public CompletableFuture<Void> publishIpnsValue(PrivKey priv, Multihash publisher, Multihash value, long sequence, Host us) {
public CompletableFuture<Void> publishIpnsValue(PrivKey priv,
Multihash publisher,
Multihash value,
long sequence,
Host us) {
int hours = 1;
LocalDateTime expiry = LocalDateTime.now().plusHours(hours);
long ttl = hours * 3600_000_000_000L;
long ttlNanos = hours * 3600_000_000_000L;
byte[] publishValue = ("/ipfs/" + value).getBytes();
return publishValue(priv, publisher, publishValue, sequence, expiry, ttlNanos, us);
}

public CompletableFuture<Void> publishValue(PrivKey priv,
Multihash publisher,
byte[] publishValue,
long sequence,
LocalDateTime expiry,
long ttlNanos,
Host us) {
int publishes = 0;
for (int i=0; i < 5 && publishes < 20; i++) {
List<PeerAddresses> closestPeers = findClosestPeers(publisher, 25, us);
publishes += closestPeers.stream().parallel().mapToInt(peer -> {
try {
boolean success = dialPeer(peer, us).join()
.putValue("/ipfs/" + value, expiry, sequence,
ttl, publisher, priv).join();
.putValue(publishValue, expiry, sequence,
ttlNanos, publisher, priv).join();
if (success)
return 1;
} catch (Exception e) {}
Expand All @@ -319,9 +333,28 @@ public CompletableFuture<Void> publishIpnsValue(PrivKey priv, Multihash publishe
return CompletableFuture.completedFuture(null);
}

public static Predicate<IpnsRecord> getNRecords(int minResults, CompletableFuture<byte[]> res) {
List<IpnsRecord> candidates = new ArrayList<>();
return rec -> {
candidates.add(rec);
if (candidates.size() >= minResults) {
// Validate and sort records by sequence number
List<IpnsRecord> records = candidates.stream().sorted().collect(Collectors.toList());
res.complete(records.get(records.size() - 1).value);
return false;
}
return true;
};
}

public CompletableFuture<String> resolveIpnsValue(Multihash publisher, Host us, int minResults) {
CompletableFuture<byte[]> res = new CompletableFuture<>();
resolveValue(publisher, us, getNRecords(minResults, res));
return res.thenApply(String::new);
}

public void resolveValue(Multihash publisher, Host us, Predicate<IpnsRecord> getMore) {
List<PeerAddresses> closestPeers = findClosestPeers(publisher, 20, us);
List<IpnsRecord> candidates = new ArrayList<>();
Set<PeerAddresses> queryCandidates = new HashSet<>();
Set<Multihash> queriedPeers = new HashSet<>();
for (PeerAddresses peer : closestPeers) {
Expand All @@ -331,15 +364,10 @@ public CompletableFuture<String> resolveIpnsValue(Multihash publisher, Host us,
try {
GetResult res = dialPeer(peer, us).join().getValue(publisher).join();
if (res.record.isPresent() && res.record.get().publisher.equals(publisher))
candidates.add(res.record.get().value);
if ( !getMore.test(res.record.get().value))
return;
queryCandidates.addAll(res.closerPeers);
} catch (Exception e) {}
if (candidates.size() >= minResults)
break;
}

// Validate and sort records by sequence number
List<IpnsRecord> records = candidates.stream().sorted().collect(Collectors.toList());
return CompletableFuture.completedFuture(records.get(records.size() - 1).value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ default CompletableFuture<Providers> getProviders(Multihash block) {
.thenApply(Providers::fromProtobuf);
}

default CompletableFuture<Boolean> putValue(String pathToPublish, LocalDateTime expiry, long sequence,
default CompletableFuture<Boolean> putValue(byte[] value, LocalDateTime expiry, long sequence,
long ttlNanos, Multihash peerId, PrivKey ourKey) {
byte[] cborEntryData = IPNS.createCborDataForIpnsEntry(pathToPublish, expiry,
byte[] cborEntryData = IPNS.createCborDataForIpnsEntry(value, expiry,
Ipns.IpnsEntry.ValidityType.EOL_VALUE, sequence, ttlNanos);
String expiryString = IPNS.formatExpiry(expiry);
byte[] signature = ourKey.sign(IPNS.createSigV2Data(cborEntryData));
Expand All @@ -62,7 +62,7 @@ default CompletableFuture<Boolean> putValue(String pathToPublish, LocalDateTime
byte[] ipnsEntry = Ipns.IpnsEntry.newBuilder()
.setSequence(sequence)
.setTtl(ttlNanos)
.setValue(ByteString.copyFrom(pathToPublish.getBytes()))
.setValue(ByteString.copyFrom(value))
.setValidityType(Ipns.IpnsEntry.ValidityType.EOL)
.setValidity(ByteString.copyFrom(expiryString.getBytes()))
.setData(ByteString.copyFrom(cborEntryData))
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/peergos/protocol/ipns/IPNS.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public static Cid getCidFromKey(ByteString key) {
return Cid.cast(key.substring(6).toByteArray());
}

public static byte[] createCborDataForIpnsEntry(String pathToPublish,
public static byte[] createCborDataForIpnsEntry(byte[] value,
LocalDateTime expiry,
long validityType,
long sequence,
long ttl) {
SortedMap<String, Cborable> state = new TreeMap<>();
state.put("TTL", new CborObject.CborLong(ttl));
state.put("Value", new CborObject.CborByteArray(pathToPublish.getBytes()));
state.put("Value", new CborObject.CborByteArray(value));
state.put("Sequence", new CborObject.CborLong(sequence));
String expiryString = formatExpiry(expiry);
state.put("Validity", new CborObject.CborByteArray(expiryString.getBytes(StandardCharsets.UTF_8)));
Expand Down Expand Up @@ -109,7 +109,7 @@ public static Optional<IpnsMapping> validateIpnsEntry(Dht.Message msg) {
if (expiry.isBefore(LocalDateTime.now()))
return Optional.empty();
byte[] entryBytes = msg.getRecord().getValue().toByteArray();
IpnsRecord record = new IpnsRecord(entryBytes, entry.getSequence(), entry.getTtl(), expiry, entry.getValue().toStringUtf8());
IpnsRecord record = new IpnsRecord(entryBytes, entry.getSequence(), entry.getTtl(), expiry, entry.getValue().toByteArray());
return Optional.of(new IpnsMapping(signer, record));
} catch (InvalidProtocolBufferException e) {
return Optional.empty();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/peergos/protocol/ipns/IpnsRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ public class IpnsRecord implements Comparable<IpnsRecord> {
public final byte[] raw;
public final long sequence, ttlNanos;
public final LocalDateTime expiry;
public final String value;
public final byte[] value;

public IpnsRecord(byte[] raw, long sequence, long ttlNanos, LocalDateTime expiry, String value) {
public IpnsRecord(byte[] raw, long sequence, long ttlNanos, LocalDateTime expiry, byte[] value) {
this.raw = raw;
this.sequence = sequence;
this.ttlNanos = ttlNanos;
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/peergos/DatabaseRecordStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class DatabaseRecordStoreTest {
public void testRecordStore() {
try (DatabaseRecordStore bs = new DatabaseRecordStore("mem:")) {
LocalDateTime now = LocalDateTime.now();
IpnsRecord record = new IpnsRecord("raw".getBytes(), 1, 2, now, "value");
IpnsRecord record = new IpnsRecord("raw".getBytes(), 1, 2, now, "value".getBytes());
Cid peerId = Cid.decode("zb2rhYSxw4ZjuzgCnWSt19Q94ERaeFhu9uSqRgjSdx9bsgM6f");
bs.put(peerId, record);
//make sure PUTing a second time succeeds
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/org/peergos/EmbeddedIpfsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.libp2p.core.*;
import io.libp2p.core.crypto.*;
import io.libp2p.core.multiformats.*;
import io.libp2p.crypto.keys.*;
import io.libp2p.protocol.*;
import org.junit.*;
import org.peergos.blockstore.*;
Expand Down Expand Up @@ -39,6 +40,20 @@ public void largeBlock() throws Exception {
node2.stop();
}

@Test
public void publishValue() throws Exception {
EmbeddedIpfs node1 = build(BootstrapTest.BOOTSTRAP_NODES, List.of(new MultiAddress("/ip4/127.0.0.1/tcp/" + TestPorts.getPort())));
node1.start();

PrivKey publisher = Ed25519Kt.generateEd25519KeyPair().getFirst();
byte[] value = "This is a test".getBytes();
node1.publishValue(publisher, value, 1).join();
byte[] res = node1.resolveValue(publisher.publicKey()).join();
Assert.assertTrue(Arrays.equals(res, value));

node1.stop();
}

@Test
public void wildcardListenerAddressesGetExpanded() {
int node1Port = TestPorts.getPort();
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/peergos/IpnsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void publishIPNSRecordToKubo() throws IOException {
for (int i = 0; i < 10; i++) {
try {
success = dht.dial(node1, address2).getController().join()
.putValue(pathToPublish, expiry, sequence, ttl, node1Id, node1.getPrivKey())
.putValue(pathToPublish.getBytes(), expiry, sequence, ttl, node1Id, node1.getPrivKey())
.orTimeout(2, TimeUnit.SECONDS).join();
break;
} catch (Exception timeout) {
Expand Down

0 comments on commit aec72ef

Please sign in to comment.