diff --git a/src/main/java/org/peergos/IpnsPublisher.java b/src/main/java/org/peergos/IpnsPublisher.java index 8529f451..bea847d2 100644 --- a/src/main/java/org/peergos/IpnsPublisher.java +++ b/src/main/java/org/peergos/IpnsPublisher.java @@ -134,7 +134,7 @@ public static List resolveAndRepublish(List publishers, res.add(records.size()); done++; if (done % 10 == 0) - System.out.println("resolved " + done); + System.out.println("resolved " + res.stream().filter(c -> c > 0).count() + " / " + done); CompletableFuture.supplyAsync(() -> publisher.publishPresignedRecord(pub.pub, pub.record)); } return res; diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index f7127914..d76f8232 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -319,15 +319,15 @@ public CompletableFuture publishIpnsValue(PrivKey priv, return publishValue(publisher, signedRecord, us); } - private boolean putValue(Multihash publisher, - byte[] signedRecord, - PeerAddresses peer, - Host us) { + private CompletableFuture putValue(Multihash publisher, + byte[] signedRecord, + PeerAddresses peer, + Host us) { try { return dialPeer(peer, us).join() - .putValue(publisher, signedRecord).join(); + .putValue(publisher, signedRecord); } catch (Exception e) {} - return false; + return CompletableFuture.completedFuture(false); } private boolean hasTransportOverlap(PeerAddresses p) { @@ -378,10 +378,11 @@ public CompletableFuture publishValue(Multihash publisher, more.add(e); } } - ioExec.submit(() -> { - if (putValue(publisher, signedRecord, r.addresses, us)) - publishes.add(r.addresses.peerId); - }); + ioExec.submit(() -> putValue(publisher, signedRecord, r.addresses, us) + .thenAccept(done -> { + if (done) + publishes.add(r.addresses.peerId); + })); return more; }).join()); }) @@ -407,10 +408,11 @@ public CompletableFuture publishValue(Multihash publisher, .map(r -> { toQuery.remove(r); queried.add(r.addresses.peerId); - return ioExec.submit(() -> { - if (putValue(publisher, signedRecord, r.addresses, us)) - publishes.add(r.addresses.peerId); - }); + return ioExec.submit(() -> putValue(publisher, signedRecord, r.addresses, us) + .thenAccept(done -> { + if (done) + publishes.add(r.addresses.peerId); + })); }) .collect(Collectors.toList()); lastFutures.forEach(f -> { @@ -433,16 +435,21 @@ public CompletableFuture resolveIpnsValue(Multihash publisher, Host us, return CompletableFuture.completedFuture(new String(records.get(records.size() - 1).value)); } - private Optional getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) { + private CompletableFuture> getValueFromPeer(PeerAddresses peer, Multihash publisher, Host us) { try { - return Optional.of(dialPeer(peer, us).orTimeout(1, TimeUnit.SECONDS).join() - .getValue(publisher).orTimeout(1, TimeUnit.SECONDS).join()); - } catch (Exception e) {} - return Optional.empty(); + return dialPeer(peer, us) + .orTimeout(1, TimeUnit.SECONDS) + .join() + .getValue(publisher) + .orTimeout(1, TimeUnit.SECONDS) + .thenApply(Optional::of); + } catch (Exception e) { + return CompletableFuture.completedFuture(Optional.empty()); + } } public List resolveValue(Multihash publisher, int minResults, Host us) { byte[] key = IPNS.getKey(publisher); - List candidates = new ArrayList<>(); + List candidates = Collections.synchronizedList(new ArrayList<>()); Optional local = engine.getRecord(publisher); local.ifPresent(candidates::add); @@ -465,8 +472,8 @@ public List resolveValue(Multihash publisher, int minResults, Host u .map(r -> { toQuery.remove(r); queried.add(r.addresses.peerId); - return ioExec.submit(() -> getValueFromPeer(r.addresses, publisher, us) - .ifPresent(g -> { + return ioExec.submit(() -> getValueFromPeer(r.addresses, publisher, us).thenAccept(get -> + get.ifPresent(g -> { if (g.record.isPresent() && g.record.get().publisher.equals(publisher)) candidates.add(g.record.get().value); for (PeerAddresses peer : g.closerPeers) { @@ -476,7 +483,7 @@ public List resolveValue(Multihash publisher, int minResults, Host u toQuery.add(e); } } - })); + }))); }) .collect(Collectors.toList()); futures.forEach(f -> {