Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bitswap): wantlist overflow handling #629

Merged
merged 17 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The following emojis are used to highlight certain changes:
### Changed

- `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0)
- `bitswap/client` supports additional tracing
- `bitswap/client` supports additional tracing

### Removed

Expand All @@ -41,6 +41,7 @@ The following emojis are used to highlight certain changes:

- `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found
- `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup
- bitswap wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have.
gammazero marked this conversation as resolved.
Show resolved Hide resolved

## [v0.20.0]

Expand Down
201 changes: 131 additions & 70 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
package decision

import (
"cmp"
"context"
"errors"
"fmt"
"math/bits"
"slices"
"sync"
"time"

"github.com/google/uuid"

wl "github.com/ipfs/boxo/bitswap/client/wantlist"
"github.com/ipfs/boxo/bitswap/internal/defaults"
bsmsg "github.com/ipfs/boxo/bitswap/message"
Expand Down Expand Up @@ -132,9 +133,9 @@
// PeerLedger is an external ledger dealing with peers and their want lists.
type PeerLedger interface {
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
gammazero marked this conversation as resolved.
Show resolved Hide resolved
Wants(p peer.ID, e wl.Entry)
Wants(p peer.ID, e wl.Entry, limit int) bool

// CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID].
// CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID].
CancelWant(p peer.ID, k cid.Cid) bool

// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
Expand Down Expand Up @@ -315,8 +316,11 @@
}
}

// WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send.
// If a peer send us more than this we will truncate newest entries.
// WithMaxQueuedWantlistEntriesPerPeer limits how many individual entries each
// peer is allowed to send. If a peer sends more than this, then the lowest
// priority entries are truncated to this limit. If there is insufficient space
// to enqueue new entries, then older existing wants with no associated blocks,
// and lower priority wants, are canceled to make room for the new wants.
func WithMaxQueuedWantlistEntriesPerPeer(count uint) Option {
return func(e *Engine) {
e.maxQueuedWantlistEntriesPerPeer = count
Expand Down Expand Up @@ -676,14 +680,12 @@
return false
}

newWorkExists := false
defer func() {
if newWorkExists {
e.signalNewWork()
}
}()

wants, cancels, denials := e.splitWantsCancelsDenials(p, m)
wants, cancels, denials, err := e.splitWantsCancelsDenials(p, m)
if err != nil {
// This is a truely broken client, let's kill the connection.
log.Warnw(err.Error(), "local", e.self, "remote", p)
return true
}

// Get block sizes
wantKs := cid.NewSet()
Expand All @@ -702,56 +704,34 @@
e.peerLedger.ClearPeerWantlist(p)
}

s := uint(e.peerLedger.WantlistSizeForPeer(p))
if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer {
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe)
// truncate wantlist to avoid overflow
available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0)
if o != 0 {
available = 0
var overflow []bsmsg.Entry
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry, int(e.maxQueuedWantlistEntriesPerPeer)) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
wants = wants[:available]
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
wants = filteredWants
gammazero marked this conversation as resolved.
Show resolved Hide resolved
}

filteredWants := wants[:0] // shift inplace

for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}

e.peerLedger.Wants(p, entry.Entry)
filteredWants = append(filteredWants, entry)
if len(overflow) != 0 {
wants = e.handleOverflow(ctx, p, wants, overflow)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])

wants = filteredWants
for _, entry := range cancels {
c := entry.Cid
if c.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer canceled an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}

log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", c)
if e.peerLedger.CancelWant(p, c) {
e.peerRequestQueue.Remove(c, p)
}
}

e.lock.Unlock()

var activeEntries []peertask.Task
Expand All @@ -761,21 +741,14 @@
// Only add the task to the queue if the requester wants a DONT_HAVE
if e.sendDontHaves && entry.SendDontHave {
c := entry.Cid

newWorkExists = true
isWantBlock := false
if entry.WantType == pb.Message_Wantlist_Block {
isWantBlock = true
}

activeEntries = append(activeEntries, peertask.Task{
Topic: c,
Priority: int(entry.Priority),
Work: bsmsg.BlockPresenceSize(c),
Data: &taskData{
BlockSize: 0,
HaveBlock: false,
IsWantBlock: isWantBlock,
IsWantBlock: entry.WantType == pb.Message_Wantlist_Block,
SendDontHave: entry.SendDontHave,
},
})
Expand All @@ -800,8 +773,6 @@
continue
}
// The block was found, add it to the queue
newWorkExists = true

isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock)
Expand All @@ -827,19 +798,91 @@
})
}

// Push entries onto the request queue
if len(activeEntries) > 0 {
// Push entries onto the request queue and signal network that new work is ready.
if len(activeEntries) != 0 {
e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, p, activeEntries...)
e.updateMetrics()
e.signalNewWork()
}
return false
}

// handleOverflow processes incoming wants that could not be addded to the peer
// ledger without exceeding the peer want limit. These are handled by trying to
// make room by canceling existing wants for which there is no block. If this
// does not make sufficient room, then any lower priority wants that have
// blocks are canceled.
func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, wants, overflow []bsmsg.Entry) []bsmsg.Entry {
existingWants := e.peerLedger.WantlistForPeer(p)
// Sort wl and overflow from least to most important.
slices.SortFunc(existingWants, func(a, b wl.Entry) int {
return cmp.Compare(a.Priority, b.Priority)
})
slices.SortFunc(overflow, func(a, b bsmsg.Entry) int {
return cmp.Compare(a.Entry.Priority, b.Entry.Priority)
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps premature optimization (depends on how large people set their limits to and how frequently they're hit). Seems like we could spend a lot of time sorting here.

For the overflow list this is probably fine (shouldn't be that big anyway and it's related to message size), but if nodes spend significant time near the limit they'll be doing:

  1. Copy the wantlist map into a list
  2. Sort the list

For basically every message that comes in.

Can the PeerRequestQueue help us out here since it's already storing a prioritized queue of what needs to be done? It might not due to how the locking/concurrency works but could save a bunch of pain.

Copy link
Contributor Author

@gammazero gammazero Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PeerRequestQueue cannot really help out, since that maintains a heap so getting an ordered list would require iterating the heap. I do not see a much less expensive alternative, other than just clearing the peer's want list at some point, like if overflow is happening too frequently.

Maybe if overflow happens 5 times in a row for a particular peer, then clear that peer's message queue? WDYT?


queuedWantKs := cid.NewSet()
for _, entry := range existingWants {
queuedWantKs.Add(entry.Cid)
}
queuedBlockSizes, err := e.bsm.getBlockSizes(ctx, queuedWantKs.Keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get what the idea is here and if this is necessary / if we can make this much cheaper

  • Is this meant as "is there a DONT_HAVE response queued up that we should replace".
    • While I get this it might also be overkill, and it might be fine to respect the user priority in responding with DONT_HAVEs, HAVEs, and blocks in the same way.
  • Is this meant as "I previously sent a DONT_HAVE and now this is sitting on my list as a subscription".
    • As discussed this definitely seems like something we should want to knock off our list if out of space

In either case it seems like we could add some extra data to the in-memory structs here rather than going to the blockstore to see if we have the data (and being at the mercy of whatever caching, bloom filters, etc. are used there)

Copy link
Contributor Author

@gammazero gammazero Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is that there is a DONT_HAVE message queued for the peer, but it has not been sent yet and is blocking new messages from being queued for the peer. So, cancel the unsent DONT_HAVE and try to enqueue something possibly more important. Either a delayed HAVE message will replace the pending DONT_HAVE, or the peer can ask again later. This should keep messages moving, even if there is some backup sending DONT_HAVE messages to peers.

This also handles the case where a DONT_HAVE message has been sent, but is not removed from the queue. Once a message is sent, the want is removed from the message queue and peer ledger only when blocks have been sent or when block presence has been sent. If a DONT_HAVE was sent the want remains on the queue and peer ledger as a place-holder should a block arrive later, and this is stopping new wants from being accepted. This is what the 5th bullet in #527 is referring to by:

This is because the bitswap server never cleanup entries after sending DONT_HAVE

So, in short, it handles both cases.

it seems like we could add some extra data to the in-memory structs here rather than going to the blockstore.

Yes, the wants for which block is found can be recorded in the peer ledger so that these can be ignored in overflow handling. However, that would need to be done in every call to engine.MessageReceived, and seems less preferable than doing something more expensive only during the exceptional case.

The task queue does already have this info, but this would require locking the tasqueue and the peer tracker for each overflow want CID to look at. Or, this would require a new taskqueue API to get a list of wants with HaveBlock set to true for a given peer. This last option might be less expensive than looking at the blockstore, but I was not comfortable with that amount of new plumbing for handling this bitswap exceptional case. WDYT?

if err != nil {
log.Info("aborting overflow processing", err)
return wants

Check warning on line 832 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L831-L832

Added lines #L831 - L832 were not covered by tests
}

// Remove entries for blocks that are not present to make room for overflow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to above is this about blocks that aren't present or subscriptions?

Note: the reason I'm pushing on the difference is that from my perspective subscriptions are much more expensive by virtue of occupying memory for an indefinite amount of time rather than a transient "while I'm sending out a response". Not sure if that's enough to justify different lists, but it's how I'm thinking in my review here (but lmk if you disagree or think I'm missing the point).

Copy link
Contributor Author

@gammazero gammazero Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the engine perspective, I do not think there is any need for distinction between subscription and request-response since that I think only determines how long a peer is in the task queue/ledger.

Overall, it probably does make more sense to only do this overflow handling for subscriptions. I was thinking/hoping this would handle itself by subscriptions being the ones primarily affected in the first place and needing to do overflow handling. I think some real-world use is necessary to determine this. I will add logging that can be used to determine when overflow handling is happening.

var removed []int
for i, w := range existingWants {
if _, found := queuedBlockSizes[w.Cid]; !found {
// Cancel lowest priority dont-have.
if e.peerLedger.CancelWant(p, w.Cid) {
e.peerRequestQueue.Remove(w.Cid, p)
}
removed = append(removed, i)
// Add highest priority overflow.
lastOver := overflow[len(overflow)-1]
overflow = overflow[:len(overflow)-1]
e.peerLedger.Wants(p, lastOver.Entry, 0)
gammazero marked this conversation as resolved.
Show resolved Hide resolved
wants = append(wants, lastOver)
if len(overflow) == 0 {
break
}
}
}

// Not enough dont-haves removed. Replace existing entries, that are a
// lower priority, with overflow entries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice I expect this never to happen given that IIUC the boxo client (which is the most widely used one) just decreases the priority over time

Given that this is the case it'd be good to:

  1. Make this shortcut fairly cheaply when looking at the overflow + existing lists (which have already been sorted earlier) we see that the lowest priority in the existingwants is higher than the highest in the overflow list (doesn't have to be an explicit check, but anything that allows this to be pretty cheap rather than linear)
  2. Have it still work (and be tested) when people choose different priorities
  3. Not being particularly expensive even in pathological cases

Copy link
Contributor Author

@gammazero gammazero Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think it probably will be unlikely to happen given that the client decreases priority over time. The thinking here was looking more toward a future where priority may be set by path distance, where items closer to a DAG root have a higher priority. In that case it seems more likely that as new wants are requested that some do have a high priority because they are root or closer to root items.

  1. Revised logic to break out as soon as lowest usable priority is hit on incoming wantlist (now ordered from most to least important).
  2. Still works.
  3. In the normal case, most of the work was already done in sorting the lists, so this compares list items until the priority in the ascending-sorted list is higher than the descending-sorted list. The pathological case is where all new incoming wants are at a higher priority. This can be detected, but there is not a better way to handle it since canceling the individual overflows need to be done. Clearing the peer's wantlist does the same thing, just for all CIDs, so that is not better.

var replace int
for _, overflowEnt := range overflow {
// Do not compare with removed existingWants entry.
for len(removed) != 0 && replace == removed[0] {
replace++
removed = removed[1:]
}
if overflowEnt.Entry.Priority <= existingWants[replace].Priority {
// Everything in existingWants is equal or more improtant, so this
// overflow entry cannot replace any existing wants.
continue
gammazero marked this conversation as resolved.
Show resolved Hide resolved
}
entCid := existingWants[replace].Cid
replace++
if e.peerLedger.CancelWant(p, entCid) {
e.peerRequestQueue.Remove(entCid, p)
}
e.peerLedger.Wants(p, overflowEnt.Entry, 0)
wants = append(wants, overflowEnt)
}

return wants
}

// Split the want-havek entries from the cancel and deny entries.
func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry) {
func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry, error) {
entries := m.Wantlist() // creates copy; safe to modify
if len(entries) == 0 {
return nil, nil, nil
return nil, nil, nil, nil
}

log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries))
Expand All @@ -848,18 +891,27 @@
var cancels, denials []bsmsg.Entry

for _, et := range entries {
c := et.Cid
if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}
if c.Prefix().MhType == mh.IDENTITY {
return nil, nil, nil, errors.New("peer canceled an identity CID")
}

if et.Cancel {
cancels = append(cancels, et)
continue
}

if et.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid)
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c)
} else {
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid)
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c)
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, et.Cid) {
if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}
Expand All @@ -871,10 +923,19 @@
wants = nil
}

// Do not take more wants that can be handled.
if len(wants) > int(e.maxQueuedWantlistEntriesPerPeer) {
// Keep the highest priority wants.
slices.SortFunc(wants, func(a, b bsmsg.Entry) int {
return cmp.Compare(b.Entry.Priority, a.Entry.Priority)
})
wants = wants[:int(e.maxQueuedWantlistEntriesPerPeer)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be fine, however do we want to do this here given that there could be duplicates once we actually look at the wantlist?

  • Pro: We shed ram usage earlier
  • Con: There might be a better set of wants that we can honor if we wait

Copy link
Contributor Author

@gammazero gammazero Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to truncate the list, but sorting can be avoided.

Truncation makes sense:
The incoming wants are already unique, so even if there are no existing wants, or all possible wants have duplicates, there is still no way any more than the limit can be used. If all those not added to the message queue are included in the overflow then they will end up getting dropped anyway because there will not be enough existing wants available to replace. Dropping them early does two things:

  • Prevents trying to add excessive number of wants to peer ledger.
  • Prevents a large number of wants from being sorted and incurring block size lookup in handleOverflow.

Truncation without sorting:
This will potentially lose higher priority wants, but avoids sorting an incoming wantlist of unknown size. In the usual case sorting is not needed anyway, so not sorting will avoid a performance hit.

There might be a better set of wants that we can honor if we wait

True, but then it is necessary to examine all the wants (a possibly excessive amount) to see if they can be added directly to the message queue, or need to be handled as overflow.

}

// Clear truncated entries.
clear(entries[len(wants):])

return wants, cancels, denials
return wants, cancels, denials, nil
}

// ReceivedBlocks is called when new blocks are received from the network.
Expand Down
Loading
Loading