From 45af7cce5cf79b699feb1aea4c442979d2b5f4b1 Mon Sep 17 00:00:00 2001 From: Ben Date: Tue, 17 Dec 2024 12:20:29 +0100 Subject: [PATCH 1/2] wip --- codex/blockexchange/engine/discovery.nim | 22 +++++++++++------ codex/blockexchange/engine/engine.nim | 25 +++++++++++--------- codex/blockexchange/engine/pendingblocks.nim | 13 ---------- codex/blockexchange/peers/peercontext.nim | 6 ----- codex/blockexchange/peers/peerctxstore.nim | 12 ++++++---- codex/blocktype.nim | 6 ----- 6 files changed, 36 insertions(+), 48 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 16ef560b4..2a291c9f0 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -56,9 +56,19 @@ type discoveryLoopSleep: Duration # Discovery loop sleep inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests +proc getCid(address: BlockAddress): Cid = + # We advertise and discover only the CID part of a block address. + # Indices are ignored. This means that multiple blocks of the same tree will + # have a single DHT entry. + if address.leaf: + address.treeCid + else: + address.cid + proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} = while b.discEngineRunning: - for cid in toSeq(b.pendingBlocks.wantListBlockCids): + for address in toSeq(b.pendingBlocks.wantList): + let cid = address.getCid() try: await b.discoveryQueue.put(cid) except CancelledError: @@ -88,10 +98,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} = trace "Discovery request already in progress", cid continue - let - haves = b.peers.peersHave(cid) - - if haves.len < b.minPeersPerBlock: + if b.peers.countPeersWhoHave(cid) < b.minPeersPerBlock: try: let request = b.discovery @@ -127,8 +134,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} = info "Exiting discovery task runner" -proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = - for cid in cids: +proc queueFindBlocksReq*(b: DiscoveryEngine, addresses: seq[BlockAddress]) {.inline.} = + for address in addresses: + let cid = address.getCid() if cid notin b.discoveryQueue: try: b.discoveryQueue.putNoWait(cid) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index c79e1eab3..6c66f78d3 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -173,7 +173,7 @@ proc monitorBlockHandle( # drop unresponsive peer await b.network.switch.disconnect(peerId) - b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + b.discovery.queueFindBlocksReq(@[address]) proc pickPseudoRandom(address: BlockAddress, peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = return peers[hash(address) mod peers.len] @@ -188,7 +188,7 @@ proc requestBlock*( let peers = b.peers.getPeersForBlock(address) if peers.with.len == 0: - b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + b.discovery.queueFindBlocksReq(@[address]) else: let selected = pickPseudoRandom(address, peers.with) asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id) @@ -246,21 +246,24 @@ proc blockPresenceHandler*( # if none of the connected peers report our wants in their have list, # fire up discovery b.discovery.queueFindBlocksReq( - toSeq(b.pendingBlocks.wantListCids) - .filter do(cid: Cid) -> bool: - not b.peers.anyIt( cid in it.peerHaveCids )) + toSeq(b.pendingBlocks.wantList).filterIt(b.peers.peersHave(it).len == 0) + ) proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = let - cids = blocksDelivery.mapIt( it.blk.cid ) + addresses = blocksDelivery.mapIt( it.address ) + + # TODO: This code assumes p.peerWants are of type wantBlock, and will schedule + # the block-sending task. But, want might be wantHave. In this case, + # we should send a presence update. Peer is scheduled but task handler + # can only send blocks. # schedule any new peers to provide blocks to for p in b.peers: - for c in cids: # for each cid - # schedule a peer if it wants at least one cid - # and we have it in our local store - if c in p.peerWantsCids: - if await (c in b.localStore): + for address in addresses: + # schedule a peer if it wants at least one + if address in p.peerWants: + if await (address in b.localStore): if b.scheduleTask(p): trace "Task scheduled for peer", peer = p.id else: diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 9c5efc0b9..cf2d08ab5 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -130,19 +130,6 @@ iterator wantList*(p: PendingBlocksManager): BlockAddress = for a in p.blocks.keys: yield a -iterator wantListBlockCids*(p: PendingBlocksManager): Cid = - for a in p.blocks.keys: - if not a.leaf: - yield a.cid - -iterator wantListCids*(p: PendingBlocksManager): Cid = - var yieldedCids = initHashSet[Cid]() - for a in p.blocks.keys: - let cid = a.cidOrTreeCid - if cid notin yieldedCids: - yieldedCids.incl(cid) - yield cid - iterator wantHandles*(p: PendingBlocksManager): Future[Block] = for v in p.blocks.values: yield v.handle diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index 727676de8..1cbf174e3 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -38,12 +38,6 @@ type proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] = toSeq(self.blocks.keys) -proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] = - self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet - -proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] = - self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet - proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool = address in self.blocks diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index 4b65d8491..b145dd31d 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -64,15 +64,17 @@ func len*(self: PeerCtxStore): int = func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it == address ) ) -func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it.cidOrTreeCid == cid ) ) +func countPeersWhoHave*(self: PeerCtxStore, cid: Cid): int = + proc getCid(address: BlockAddress): Cid = + if address.leaf: + address.treeCid + else: + address.cid + self.peers.values.countIt(it.peerHave.anyIt( it.getCid() == cid ) ) func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it == address ) ) -func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = - toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) ) - proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = var res = PeersForBlock() for peer in self: diff --git a/codex/blocktype.nim b/codex/blocktype.nim index c44e4fd88..a149068d7 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -66,12 +66,6 @@ proc `$`*(a: BlockAddress): string = else: "cid: " & $a.cid -proc cidOrTreeCid*(a: BlockAddress): Cid = - if a.leaf: - a.treeCid - else: - a.cid - proc address*(b: Block): BlockAddress = BlockAddress(leaf: false, cid: b.cid) From 8318eac6cc88f42fe94ee46718f5068cfdf2189d Mon Sep 17 00:00:00 2001 From: Ben Date: Tue, 17 Dec 2024 12:52:03 +0100 Subject: [PATCH 2/2] Restore cidOrTreeCid --- codex/blockexchange/engine/discovery.nim | 19 ++++++------------- codex/blockexchange/peers/peerctxstore.nim | 7 +------ codex/blocktype.nim | 6 ++++++ .../codex/blockexchange/testpendingblocks.nim | 4 ++-- 4 files changed, 15 insertions(+), 21 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 2a291c9f0..ab67cf82a 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -56,19 +56,10 @@ type discoveryLoopSleep: Duration # Discovery loop sleep inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests -proc getCid(address: BlockAddress): Cid = - # We advertise and discover only the CID part of a block address. - # Indices are ignored. This means that multiple blocks of the same tree will - # have a single DHT entry. - if address.leaf: - address.treeCid - else: - address.cid - proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} = while b.discEngineRunning: for address in toSeq(b.pendingBlocks.wantList): - let cid = address.getCid() + let cid = address.cidOrTreeCid try: await b.discoveryQueue.put(cid) except CancelledError: @@ -134,15 +125,17 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} = info "Exiting discovery task runner" -proc queueFindBlocksReq*(b: DiscoveryEngine, addresses: seq[BlockAddress]) {.inline.} = - for address in addresses: - let cid = address.getCid() +proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = + for cid in cids: if cid notin b.discoveryQueue: try: b.discoveryQueue.putNoWait(cid) except CatchableError as exc: warn "Exception queueing discovery request", exc = exc.msg +proc queueFindBlocksReq*(b: DiscoveryEngine, addresses: seq[BlockAddress]) {.inline.} = + b.queueFindBlocksReq(addresses.mapIt(it.cidOrTreeCid)) + proc start*(b: DiscoveryEngine) {.async.} = ## Start the discengine task ## diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index b145dd31d..2d8380944 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -65,12 +65,7 @@ func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it == address ) ) func countPeersWhoHave*(self: PeerCtxStore, cid: Cid): int = - proc getCid(address: BlockAddress): Cid = - if address.leaf: - address.treeCid - else: - address.cid - self.peers.values.countIt(it.peerHave.anyIt( it.getCid() == cid ) ) + self.peers.values.countIt(it.peerHave.anyIt( it.cidOrTreeCid == cid ) ) func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it == address ) ) diff --git a/codex/blocktype.nim b/codex/blocktype.nim index a149068d7..c44e4fd88 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -66,6 +66,12 @@ proc `$`*(a: BlockAddress): string = else: "cid: " & $a.cid +proc cidOrTreeCid*(a: BlockAddress): Cid = + if a.leaf: + a.treeCid + else: + a.cid + proc address*(b: Block): BlockAddress = BlockAddress(leaf: false, cid: b.cid) diff --git a/tests/codex/blockexchange/testpendingblocks.nim b/tests/codex/blockexchange/testpendingblocks.nim index dd94c4da1..d3337cd3d 100644 --- a/tests/codex/blockexchange/testpendingblocks.nim +++ b/tests/codex/blockexchange/testpendingblocks.nim @@ -63,8 +63,8 @@ checksuite "Pending Blocks": discard blks.mapIt( pendingBlocks.getWantHandle( it.cid ) ) check: - blks.mapIt( $it.cid ).sorted(cmp[string]) == - toSeq(pendingBlocks.wantListBlockCids).mapIt( $it ).sorted(cmp[string]) + blks.mapIt( $BlockAddress.init(it.cid) ).sorted(cmp[string]) == + toSeq(pendingBlocks.wantList).mapIt( $it ).sorted(cmp[string]) test "Should get want handles list": let