Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup blockexchange blockAddress handling #1051

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type

proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
for address in toSeq(b.pendingBlocks.wantList):
let cid = address.cidOrTreeCid
try:
await b.discoveryQueue.put(cid)
except CancelledError:
Expand Down Expand Up @@ -88,10 +89,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
trace "Discovery request already in progress", cid
continue

let
haves = b.peers.peersHave(cid)

if haves.len < b.minPeersPerBlock:
if b.peers.countPeersWhoHave(cid) < b.minPeersPerBlock:
try:
let
request = b.discovery
Expand Down Expand Up @@ -135,6 +133,9 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
except CatchableError as exc:
warn "Exception queueing discovery request", exc = exc.msg

proc queueFindBlocksReq*(b: DiscoveryEngine, addresses: seq[BlockAddress]) {.inline.} =
b.queueFindBlocksReq(addresses.mapIt(it.cidOrTreeCid))

proc start*(b: DiscoveryEngine) {.async.} =
## Start the discengine task
##
Expand Down
25 changes: 14 additions & 11 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@

# drop unresponsive peer
await b.network.switch.disconnect(peerId)
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
b.discovery.queueFindBlocksReq(@[address])

Check warning on line 176 in codex/blockexchange/engine/engine.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/engine.nim#L176

Added line #L176 was not covered by tests

proc pickPseudoRandom(address: BlockAddress, peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
return peers[hash(address) mod peers.len]
Expand All @@ -188,7 +188,7 @@
let peers = b.peers.getPeersForBlock(address)

if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
b.discovery.queueFindBlocksReq(@[address])
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
Expand Down Expand Up @@ -246,21 +246,24 @@
# if none of the connected peers report our wants in their have list,
# fire up discovery
b.discovery.queueFindBlocksReq(
toSeq(b.pendingBlocks.wantListCids)
.filter do(cid: Cid) -> bool:
not b.peers.anyIt( cid in it.peerHaveCids ))
toSeq(b.pendingBlocks.wantList).filterIt(b.peers.peersHave(it).len == 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trusting those two things are the same....

)

proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
let
cids = blocksDelivery.mapIt( it.blk.cid )
addresses = blocksDelivery.mapIt( it.address )

# TODO: This code assumes p.peerWants are of type wantBlock, and will schedule
# the block-sending task. But, want might be wantHave. In this case,
# we should send a presence update. Peer is scheduled but task handler
# can only send blocks.

# schedule any new peers to provide blocks to
for p in b.peers:
for c in cids: # for each cid
# schedule a peer if it wants at least one cid
# and we have it in our local store
if c in p.peerWantsCids:
if await (c in b.localStore):
for address in addresses:
# schedule a peer if it wants at least one
if address in p.peerWants:
if await (address in b.localStore):
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
Expand Down
13 changes: 0 additions & 13 deletions codex/blockexchange/engine/pendingblocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,6 @@ iterator wantList*(p: PendingBlocksManager): BlockAddress =
for a in p.blocks.keys:
yield a

iterator wantListBlockCids*(p: PendingBlocksManager): Cid =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is the actual dead code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, unused code.

for a in p.blocks.keys:
if not a.leaf:
yield a.cid

iterator wantListCids*(p: PendingBlocksManager): Cid =
var yieldedCids = initHashSet[Cid]()
for a in p.blocks.keys:
let cid = a.cidOrTreeCid
if cid notin yieldedCids:
yieldedCids.incl(cid)
yield cid

iterator wantHandles*(p: PendingBlocksManager): Future[Block] =
for v in p.blocks.values:
yield v.handle
Expand Down
6 changes: 0 additions & 6 deletions codex/blockexchange/peers/peercontext.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ type
proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
toSeq(self.blocks.keys)

proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this 🙂

self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet

proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] =
self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet

proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocks

Expand Down
7 changes: 2 additions & 5 deletions codex/blockexchange/peers/peerctxstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,12 @@ func len*(self: PeerCtxStore): int =
func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it == address ) )

func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerHave.anyIt( it.cidOrTreeCid == cid ) )
func countPeersWhoHave*(self: PeerCtxStore, cid: Cid): int =
self.peers.values.countIt(it.peerHave.anyIt( it.cidOrTreeCid == cid ) )

func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it == address ) )

func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) )

proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res = PeersForBlock()
for peer in self:
Expand Down
4 changes: 2 additions & 2 deletions tests/codex/blockexchange/testpendingblocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ checksuite "Pending Blocks":
discard blks.mapIt( pendingBlocks.getWantHandle( it.cid ) )

check:
blks.mapIt( $it.cid ).sorted(cmp[string]) ==
toSeq(pendingBlocks.wantListBlockCids).mapIt( $it ).sorted(cmp[string])
blks.mapIt( $BlockAddress.init(it.cid) ).sorted(cmp[string]) ==
toSeq(pendingBlocks.wantList).mapIt( $it ).sorted(cmp[string])

test "Should get want handles list":
let
Expand Down
Loading