From a06156c7d19cd8882da0b87bef777fa9fe9db162 Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 21 Jun 2024 01:22:28 -0700 Subject: [PATCH 01/13] Fix wantlist overflow handling to select newer entries. wantlist overflow handling now cancels existing entries to make room for newer requests. This fix prevents the wantlist from filling up with CIDs that the server does not have. Fixes #527 --- CHANGELOG.md | 3 +- bitswap/server/internal/decision/engine.go | 89 +++++++++++++------ .../server/internal/decision/engine_test.go | 58 ++++++++++++ .../server/internal/decision/peer_ledger.go | 3 +- 4 files changed, 123 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85176369f..2addb5a44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ The following emojis are used to highlight certain changes: ### Changed - `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0) -- `bitswap/client` supports additional tracing +- `bitswap/client` supports additional tracing ### Removed @@ -41,6 +41,7 @@ The following emojis are used to highlight certain changes: - `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found - `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup +- bitswap wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. ## [v0.20.0] diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index bc934db5a..564e27ad6 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -4,7 +4,6 @@ package decision import ( "context" "fmt" - "math/bits" "sync" "time" @@ -134,7 +133,7 @@ type PeerLedger interface { // Wants informs the ledger that [peer.ID] wants [wl.Entry]. Wants(p peer.ID, e wl.Entry) - // CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID]. + // CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID]. CancelWant(p peer.ID, k cid.Cid) bool // CancelWantWithType will not cancel WantBlock if we sent a HAVE message. @@ -702,38 +701,72 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap e.peerLedger.ClearPeerWantlist(p) } - s := uint(e.peerLedger.WantlistSizeForPeer(p)) - if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer { - log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe) - // truncate wantlist to avoid overflow - available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0) - if o != 0 { - available = 0 + if len(wants) != 0 { + filteredWants := wants[:0] // shift inplace + for _, entry := range wants { + if entry.Cid.Prefix().MhType == mh.IDENTITY { + // This is a truely broken client, let's kill the connection. + e.lock.Unlock() + log.Warnw("peer wants an identity CID", "local", e.self, "remote", p) + return true + } + if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { + // Ignore requests about CIDs that big. + continue + } + filteredWants = append(filteredWants, entry) + if len(filteredWants) == int(e.maxQueuedWantlistEntriesPerPeer) { + // filteredWants at limit, ignore remaining wants from request. + log.Debugw("requested wants exceeds max wantlist size", "local", e.self, "remote", p, "ignoring", len(wants)-len(filteredWants)) + break + } } - wants = wants[:available] - } - - filteredWants := wants[:0] // shift inplace - - for _, entry := range wants { - if entry.Cid.Prefix().MhType == mh.IDENTITY { - // This is a truely broken client, let's kill the connection. - e.lock.Unlock() - log.Warnw("peer wants an identity CID", "local", e.self, "remote", p) - return true + wants = wants[len(filteredWants):] + for i := range wants { + wants[i] = bsmsg.Entry{} // early GC } - if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { - // Ignore requests about CIDs that big. - continue + wants = filteredWants + + // Ensure sufficient space for new wants. + s := e.peerLedger.WantlistSizeForPeer(p) + available := int(e.maxQueuedWantlistEntriesPerPeer) - s + if len(wants) > available { + needSpace := len(wants) - available + log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace) + // Cancel any wants that are being requested again. This makes room + // for new wants and minimizes that existing wants to cancel that + // are not in the new request. + for _, entry := range wants { + if e.peerLedger.CancelWant(p, entry.Cid) { + e.peerRequestQueue.Remove(entry.Cid, p) + needSpace-- + if needSpace == 0 { + break + } + } + } + // Cancel additional wants, that are not being replaced, to make + // room for new wants. + if needSpace != 0 { + wl := e.peerLedger.WantlistForPeer(p) + for i := range wl { + entCid := wl[i].Cid + if e.peerLedger.CancelWant(p, entCid) { + e.peerRequestQueue.Remove(entCid, p) + needSpace-- + if needSpace == 0 { + break + } + } + } + } } - e.peerLedger.Wants(p, entry.Entry) - filteredWants = append(filteredWants, entry) + for _, entry := range wants { + e.peerLedger.Wants(p, entry.Entry) + } } - // Clear truncated entries - early GC. - clear(wants[len(filteredWants):]) - wants = filteredWants for _, entry := range cancels { c := entry.Cid if c.Prefix().MhType == mh.IDENTITY { diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index c25e3508d..a9595968d 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -1733,3 +1733,61 @@ func TestKillConnectionForInlineCid(t *testing.T) { t.Fatal("connection was not killed when receiving inline in cancel") } } + +func TestWantlistOverflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const limit = 32 + warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) + riga := newTestEngine(ctx, "riga") + + m := message.New(false) + for i := 0; i < limit+(limit/2); i++ { + m.AddEntry(blocks.NewBlock([]byte(fmt.Sprint(i))).Cid(), 0, pb.Message_Wantlist_Block, true) + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + + if warsaw.Peer == riga.Peer { + t.Fatal("Sanity Check: Peers have same Key!") + } + + // Check that the wantlist is at the size limit, and limit/2 wants ignored. + wl := warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist does not match limit", len(wl)) + } + + m = message.New(false) + blockCids := make([]cid.Cid, limit/2+4) + for i := 0; i < limit/2+4; i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(i + limit))).Cid() + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) + blockCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + + // Check that wantlist is still at size limit. + if len(wl) != limit { + t.Fatalf("wantlist size %d does not match limit %d", len(wl), limit) + } + + // Check that all new blocks are in wantlist. + var missing int + for _, c := range blockCids { + var found bool + for i := range wl { + if wl[i].Cid == c { + found = true + break + } + } + if !found { + missing++ + } + } + if missing != 0 { + t.Fatalf("Missing %d new wants expected in wantlist", missing) + } +} diff --git a/bitswap/server/internal/decision/peer_ledger.go b/bitswap/server/internal/decision/peer_ledger.go index b79db226d..714e28d42 100644 --- a/bitswap/server/internal/decision/peer_ledger.go +++ b/bitswap/server/internal/decision/peer_ledger.go @@ -42,13 +42,14 @@ func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool { if !ok { return false } + _, had := wants[k] delete(wants, k) if len(wants) == 0 { delete(l.peers, p) } l.removePeerFromCid(p, k) - return true + return had } func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) { From 9c35f1841e508a95a6d6d15f6a1f06cf82bd6fd9 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 3 Jul 2024 11:19:12 -0700 Subject: [PATCH 02/13] closer --- bitswap/server/internal/decision/engine.go | 197 ++++++++++-------- .../server/internal/decision/engine_test.go | 99 ++++++--- .../server/internal/decision/peer_ledger.go | 11 +- 3 files changed, 191 insertions(+), 116 deletions(-) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 564e27ad6..4d5b9fbda 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -2,13 +2,15 @@ package decision import ( + "cmp" "context" + "errors" "fmt" + "slices" "sync" "time" "github.com/google/uuid" - wl "github.com/ipfs/boxo/bitswap/client/wantlist" "github.com/ipfs/boxo/bitswap/internal/defaults" bsmsg "github.com/ipfs/boxo/bitswap/message" @@ -131,7 +133,7 @@ type PeerEntry struct { // PeerLedger is an external ledger dealing with peers and their want lists. type PeerLedger interface { // Wants informs the ledger that [peer.ID] wants [wl.Entry]. - Wants(p peer.ID, e wl.Entry) + Wants(p peer.ID, e wl.Entry, limit int) bool // CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID]. CancelWant(p peer.ID, k cid.Cid) bool @@ -675,14 +677,12 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return false } - newWorkExists := false - defer func() { - if newWorkExists { - e.signalNewWork() - } - }() - - wants, cancels, denials := e.splitWantsCancelsDenials(p, m) + wants, cancels, denials, err := e.splitWantsCancelsDenials(p, m) + if err != nil { + // This is a truely broken client, let's kill the connection. + log.Warnw(err.Error(), "local", e.self, "remote", p) + return true + } // Get block sizes wantKs := cid.NewSet() @@ -701,90 +701,59 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap e.peerLedger.ClearPeerWantlist(p) } + var overflow []bsmsg.Entry if len(wants) != 0 { filteredWants := wants[:0] // shift inplace for _, entry := range wants { - if entry.Cid.Prefix().MhType == mh.IDENTITY { - // This is a truely broken client, let's kill the connection. - e.lock.Unlock() - log.Warnw("peer wants an identity CID", "local", e.self, "remote", p) - return true - } - if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { - // Ignore requests about CIDs that big. + if !e.peerLedger.Wants(p, entry.Entry, int(e.maxQueuedWantlistEntriesPerPeer)) { + // Cannot add entry because it would exceed size limit. + overflow = append(overflow, entry) continue } filteredWants = append(filteredWants, entry) - if len(filteredWants) == int(e.maxQueuedWantlistEntriesPerPeer) { - // filteredWants at limit, ignore remaining wants from request. - log.Debugw("requested wants exceeds max wantlist size", "local", e.self, "remote", p, "ignoring", len(wants)-len(filteredWants)) - break - } - } - wants = wants[len(filteredWants):] - for i := range wants { - wants[i] = bsmsg.Entry{} // early GC } + // Clear truncated entries - early GC. + clear(wants[len(filteredWants):]) wants = filteredWants + } - // Ensure sufficient space for new wants. - s := e.peerLedger.WantlistSizeForPeer(p) - available := int(e.maxQueuedWantlistEntriesPerPeer) - s - if len(wants) > available { - needSpace := len(wants) - available - log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace) - // Cancel any wants that are being requested again. This makes room - // for new wants and minimizes that existing wants to cancel that - // are not in the new request. - for _, entry := range wants { - if e.peerLedger.CancelWant(p, entry.Cid) { - e.peerRequestQueue.Remove(entry.Cid, p) - needSpace-- - if needSpace == 0 { - break - } - } + if len(overflow) != 0 { + // Sort wl and overflow from least to most important. + peerWants := e.peerLedger.WantlistForPeer(p) + slices.SortFunc(peerWants, func(a, b wl.Entry) int { + return cmp.Compare(a.Priority, b.Priority) + }) + slices.SortFunc(overflow, func(a, b bsmsg.Entry) int { + return cmp.Compare(a.Entry.Priority, b.Entry.Priority) + }) + + // Put overflow wants onto the request queue by replacing entries that + // have the same or lower priority. + var replace int + for _, entry := range overflow { + if entry.Entry.Priority <= peerWants[replace].Priority { + // Everything in peerWants is equal or more improtant, so this + // overflow entry cannot replace any existing wants. + continue } - // Cancel additional wants, that are not being replaced, to make - // room for new wants. - if needSpace != 0 { - wl := e.peerLedger.WantlistForPeer(p) - for i := range wl { - entCid := wl[i].Cid - if e.peerLedger.CancelWant(p, entCid) { - e.peerRequestQueue.Remove(entCid, p) - needSpace-- - if needSpace == 0 { - break - } - } - } + entCid := peerWants[replace].Cid + replace++ + if e.peerLedger.CancelWant(p, entCid) { + e.peerRequestQueue.Remove(entCid, p) } - } - - for _, entry := range wants { - e.peerLedger.Wants(p, entry.Entry) + e.peerLedger.Wants(p, entry.Entry, int(e.maxQueuedWantlistEntriesPerPeer)) + wants = append(wants, entry) } } for _, entry := range cancels { c := entry.Cid - if c.Prefix().MhType == mh.IDENTITY { - // This is a truely broken client, let's kill the connection. - e.lock.Unlock() - log.Warnw("peer canceled an identity CID", "local", e.self, "remote", p) - return true - } - if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize { - // Ignore requests about CIDs that big. - continue - } - log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", c) if e.peerLedger.CancelWant(p, c) { e.peerRequestQueue.Remove(c, p) } } + e.lock.Unlock() var activeEntries []peertask.Task @@ -795,7 +764,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap if e.sendDontHaves && entry.SendDontHave { c := entry.Cid - newWorkExists = true isWantBlock := false if entry.WantType == pb.Message_Wantlist_Block { isWantBlock = true @@ -833,8 +801,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap continue } // The block was found, add it to the queue - newWorkExists = true - isWantBlock := e.sendAsBlock(entry.WantType, blockSize) log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock) @@ -860,19 +826,64 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap }) } - // Push entries onto the request queue - if len(activeEntries) > 0 { + // Push entries onto the request queue and signal network that new work is ready. + if len(activeEntries) != 0 { e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, p, activeEntries...) e.updateMetrics() + e.signalNewWork() } return false } +/* + + // Ensure sufficient space for new wants. + s := e.peerLedger.WantlistSizeForPeer(p) + available := int(e.maxQueuedWantlistEntriesPerPeer) - s + if len(wants) > available { + needSpace := len(wants) - available + log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace) + // Cancel any wants that are being requested again. This makes room + // for new wants and minimizes that existing wants to cancel that + // are not in the new request. + for _, entry := range wants { + if e.peerLedger.CancelWant(p, entry.Cid) { + e.peerRequestQueue.Remove(entry.Cid, p) + needSpace-- + if needSpace == 0 { + break + } + } + } + // Cancel additional wants, that are not being replaced, to make + // room for new wants. + if needSpace != 0 { + wl := e.peerLedger.WantlistForPeer(p) + for i := range wl { + entCid := wl[i].Cid + if e.peerLedger.CancelWant(p, entCid) { + e.peerRequestQueue.Remove(entCid, p) + needSpace-- + if needSpace == 0 { + break + } + } + } + } + } + + for _, entry := range wants { + e.peerLedger.Wants(p, entry.Entry) + } + } + +*/ + // Split the want-havek entries from the cancel and deny entries. -func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry) { +func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry, error) { entries := m.Wantlist() // creates copy; safe to modify if len(entries) == 0 { - return nil, nil, nil + return nil, nil, nil, nil } log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries)) @@ -881,18 +892,27 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([] var cancels, denials []bsmsg.Entry for _, et := range entries { + c := et.Cid + if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize { + // Ignore requests about CIDs that big. + continue + } + if c.Prefix().MhType == mh.IDENTITY { + return nil, nil, nil, errors.New("peer canceled an identity CID") + } + if et.Cancel { cancels = append(cancels, et) continue } if et.WantType == pb.Message_Wantlist_Have { - log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid) + log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c) } else { - log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid) + log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c) } - if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, et.Cid) { + if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) { denials = append(denials, et) continue } @@ -904,10 +924,19 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([] wants = nil } + // Do not take more wants that can be handled. + if len(wants) > int(e.maxQueuedWantlistEntriesPerPeer) { + // Keep the highest priority wants. + slices.SortFunc(wants, func(a, b bsmsg.Entry) int { + return cmp.Compare(b.Entry.Priority, a.Entry.Priority) + }) + wants = wants[:int(e.maxQueuedWantlistEntriesPerPeer)] + } + // Clear truncated entries. clear(entries[len(wants):]) - return wants, cancels, denials + return wants, cancels, denials, nil } // ReceivedBlocks is called when new blocks are received from the network. diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index a9595968d..b9b566a1a 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/benbjohnson/clock" + wl "github.com/ipfs/boxo/bitswap/client/wantlist" "github.com/ipfs/boxo/bitswap/internal/testutil" message "github.com/ipfs/boxo/bitswap/message" pb "github.com/ipfs/boxo/bitswap/message/pb" @@ -1739,55 +1740,91 @@ func TestWantlistOverflow(t *testing.T) { defer cancel() const limit = 32 + warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) riga := newTestEngine(ctx, "riga") + if warsaw.Peer == riga.Peer { + t.Fatal("Sanity Check: Peers have same Key!") + } + var blockNum int m := message.New(false) - for i := 0; i < limit+(limit/2); i++ { - m.AddEntry(blocks.NewBlock([]byte(fmt.Sprint(i))).Cid(), 0, pb.Message_Wantlist_Block, true) + for blockNum < limit { + m.AddEntry(blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid(), 1, pb.Message_Wantlist_Block, true) + blockNum++ + } + lowPrioCids := make([]cid.Cid, 0, 5) + for blockNum < cap(lowPrioCids) { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) + lowPrioCids = append(lowPrioCids, c) } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) - - if warsaw.Peer == riga.Peer { - t.Fatal("Sanity Check: Peers have same Key!") + highPrioCids := make([]cid.Cid, 0, 5) + for blockNum < cap(highPrioCids) { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 10, pb.Message_Wantlist_Block, true) + lowPrioCids = append(highPrioCids, c) } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) - // Check that the wantlist is at the size limit, and limit/2 wants ignored. + // Check that the wantlist is at the size limit. wl := warsaw.Engine.WantlistForPeer(riga.Peer) if len(wl) != limit { - t.Fatal("wantlist does not match limit", len(wl)) + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + + // Check that low priority entries not on wantlist. + for _, c := range lowPrioCids { + if findCid(c, wl) { + t.Fatal("low priority entry should not be on wantlist") + } + } + // Check that high priority entries are all on wantlist. + for _, c := range highPrioCids { + if !findCid(c, wl) { + t.Fatal("expected high priority entry on wantlist") + } } m = message.New(false) - blockCids := make([]cid.Cid, limit/2+4) - for i := 0; i < limit/2+4; i++ { - c := blocks.NewBlock([]byte(fmt.Sprint(i + limit))).Cid() + + lowPrioCids = lowPrioCids[:0] + for blockNum < cap(lowPrioCids) { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) - blockCids[i] = c + lowPrioCids = append(lowPrioCids, c) } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) - wl = warsaw.Engine.WantlistForPeer(riga.Peer) - - // Check that wantlist is still at size limit. - if len(wl) != limit { - t.Fatalf("wantlist size %d does not match limit %d", len(wl), limit) + highPrioCids = highPrioCids[:0] + for blockNum < cap(highPrioCids) { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 10, pb.Message_Wantlist_Block, true) + lowPrioCids = append(highPrioCids, c) } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) - // Check that all new blocks are in wantlist. - var missing int - for _, c := range blockCids { - var found bool - for i := range wl { - if wl[i].Cid == c { - found = true - break - } + // Check that low priority entries not on wantlist. + for _, c := range lowPrioCids { + if findCid(c, wl) { + t.Fatal("low priority entry should not be on wantlist") } - if !found { - missing++ + } + // Check that high priority entries are all on wantlist. + for _, c := range highPrioCids { + if !findCid(c, wl) { + t.Fatal("expected high priority entry on wantlist") } } - if missing != 0 { - t.Fatalf("Missing %d new wants expected in wantlist", missing) +} + +func findCid(c cid.Cid, wantList []wl.Entry) bool { + for i := range wantList { + if wantList[i].Cid == c { + return true + } } + return false } diff --git a/bitswap/server/internal/decision/peer_ledger.go b/bitswap/server/internal/decision/peer_ledger.go index 714e28d42..52d610ae8 100644 --- a/bitswap/server/internal/decision/peer_ledger.go +++ b/bitswap/server/internal/decision/peer_ledger.go @@ -21,11 +21,18 @@ func NewDefaultPeerLedger() *DefaultPeerLedger { } } -func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) { +// Wants adds an entry to the peer ledger. If adding the entry would make the +// peer ledger exceed the size limit, then the entry is not added and false is +// returned. +func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry, limit int) bool { cids, ok := l.peers[p] if !ok { cids = make(map[cid.Cid]entry) l.peers[p] = cids + } else if len(cids) == limit { + if _, ok = cids[e.Cid]; !ok { + return false // cannot add to peer ledger + } } cids[e.Cid] = entry{e.Priority, e.WantType} @@ -35,6 +42,8 @@ func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) { l.cids[e.Cid] = m } m[p] = entry{e.Priority, e.WantType} + + return true } func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool { From eaf6667f82cd102f42357439103e000baa463988 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 3 Jul 2024 11:42:01 -0700 Subject: [PATCH 03/13] ignore 0 limit on peerledger wants --- bitswap/server/internal/decision/engine.go | 46 +------------------ .../server/internal/decision/peer_ledger.go | 4 +- 2 files changed, 3 insertions(+), 47 deletions(-) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 4d5b9fbda..2362eb93e 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -741,7 +741,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap if e.peerLedger.CancelWant(p, entCid) { e.peerRequestQueue.Remove(entCid, p) } - e.peerLedger.Wants(p, entry.Entry, int(e.maxQueuedWantlistEntriesPerPeer)) + e.peerLedger.Wants(p, entry.Entry, 0) wants = append(wants, entry) } } @@ -835,50 +835,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return false } -/* - - // Ensure sufficient space for new wants. - s := e.peerLedger.WantlistSizeForPeer(p) - available := int(e.maxQueuedWantlistEntriesPerPeer) - s - if len(wants) > available { - needSpace := len(wants) - available - log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace) - // Cancel any wants that are being requested again. This makes room - // for new wants and minimizes that existing wants to cancel that - // are not in the new request. - for _, entry := range wants { - if e.peerLedger.CancelWant(p, entry.Cid) { - e.peerRequestQueue.Remove(entry.Cid, p) - needSpace-- - if needSpace == 0 { - break - } - } - } - // Cancel additional wants, that are not being replaced, to make - // room for new wants. - if needSpace != 0 { - wl := e.peerLedger.WantlistForPeer(p) - for i := range wl { - entCid := wl[i].Cid - if e.peerLedger.CancelWant(p, entCid) { - e.peerRequestQueue.Remove(entCid, p) - needSpace-- - if needSpace == 0 { - break - } - } - } - } - } - - for _, entry := range wants { - e.peerLedger.Wants(p, entry.Entry) - } - } - -*/ - // Split the want-havek entries from the cancel and deny entries. func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry, error) { entries := m.Wantlist() // creates copy; safe to modify diff --git a/bitswap/server/internal/decision/peer_ledger.go b/bitswap/server/internal/decision/peer_ledger.go index 52d610ae8..814879d97 100644 --- a/bitswap/server/internal/decision/peer_ledger.go +++ b/bitswap/server/internal/decision/peer_ledger.go @@ -23,13 +23,13 @@ func NewDefaultPeerLedger() *DefaultPeerLedger { // Wants adds an entry to the peer ledger. If adding the entry would make the // peer ledger exceed the size limit, then the entry is not added and false is -// returned. +// returned. A limit of zero is ignored. func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry, limit int) bool { cids, ok := l.peers[p] if !ok { cids = make(map[cid.Cid]entry) l.peers[p] = cids - } else if len(cids) == limit { + } else if limit != 0 && len(cids) == limit { if _, ok = cids[e.Cid]; !ok { return false // cannot add to peer ledger } From 25e948e63f67f47fde9d549b85b2230d214c9f3d Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 3 Jul 2024 23:28:16 -0700 Subject: [PATCH 04/13] Handle wantlist overflow by selectively replacing existing wants Handle incoming wants that could not be addded to the peer ledger without exceeding the peer want limit. These are handled by trying to make room for them by canceling existing wants for which there is no block. If this does not make sufficient room, then any lower priority wants that have blocks are canceled. --- bitswap/server/internal/decision/engine.go | 106 ++++++++++++------ .../server/internal/decision/engine_test.go | 91 ++++++++++----- 2 files changed, 136 insertions(+), 61 deletions(-) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 2362eb93e..5a769c552 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -718,32 +718,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } if len(overflow) != 0 { - // Sort wl and overflow from least to most important. - peerWants := e.peerLedger.WantlistForPeer(p) - slices.SortFunc(peerWants, func(a, b wl.Entry) int { - return cmp.Compare(a.Priority, b.Priority) - }) - slices.SortFunc(overflow, func(a, b bsmsg.Entry) int { - return cmp.Compare(a.Entry.Priority, b.Entry.Priority) - }) - - // Put overflow wants onto the request queue by replacing entries that - // have the same or lower priority. - var replace int - for _, entry := range overflow { - if entry.Entry.Priority <= peerWants[replace].Priority { - // Everything in peerWants is equal or more improtant, so this - // overflow entry cannot replace any existing wants. - continue - } - entCid := peerWants[replace].Cid - replace++ - if e.peerLedger.CancelWant(p, entCid) { - e.peerRequestQueue.Remove(entCid, p) - } - e.peerLedger.Wants(p, entry.Entry, 0) - wants = append(wants, entry) - } + wants = e.handleOverflow(ctx, p, wants, overflow) } for _, entry := range cancels { @@ -763,12 +738,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // Only add the task to the queue if the requester wants a DONT_HAVE if e.sendDontHaves && entry.SendDontHave { c := entry.Cid - - isWantBlock := false - if entry.WantType == pb.Message_Wantlist_Block { - isWantBlock = true - } - activeEntries = append(activeEntries, peertask.Task{ Topic: c, Priority: int(entry.Priority), @@ -776,7 +745,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap Data: &taskData{ BlockSize: 0, HaveBlock: false, - IsWantBlock: isWantBlock, + IsWantBlock: entry.WantType == pb.Message_Wantlist_Block, SendDontHave: entry.SendDontHave, }, }) @@ -835,6 +804,77 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return false } +// handleOverflow processes incoming wants that could not be addded to the peer +// ledger without exceeding the peer want limit. These are handled by trying to +// make room by canceling existing wants for which there is no block. If this +// does not make sufficient room, then any lower priority wants that have +// blocks are canceled. +func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, wants, overflow []bsmsg.Entry) []bsmsg.Entry { + existingWants := e.peerLedger.WantlistForPeer(p) + // Sort wl and overflow from least to most important. + slices.SortFunc(existingWants, func(a, b wl.Entry) int { + return cmp.Compare(a.Priority, b.Priority) + }) + slices.SortFunc(overflow, func(a, b bsmsg.Entry) int { + return cmp.Compare(a.Entry.Priority, b.Entry.Priority) + }) + + queuedWantKs := cid.NewSet() + for _, entry := range existingWants { + queuedWantKs.Add(entry.Cid) + } + queuedBlockSizes, err := e.bsm.getBlockSizes(ctx, queuedWantKs.Keys()) + if err != nil { + log.Info("aborting overflow processing", err) + return wants + } + + // Remove entries for blocks that are not present to make room for overflow. + var removed []int + for i, w := range existingWants { + if _, found := queuedBlockSizes[w.Cid]; !found { + // Cancel lowest priority dont-have. + if e.peerLedger.CancelWant(p, w.Cid) { + e.peerRequestQueue.Remove(w.Cid, p) + } + removed = append(removed, i) + // Add highest priority overflow. + lastOver := overflow[len(overflow)-1] + overflow = overflow[:len(overflow)-1] + e.peerLedger.Wants(p, lastOver.Entry, 0) + wants = append(wants, lastOver) + if len(overflow) == 0 { + break + } + } + } + + // Not enough dont-haves removed. Replace existing entries, that are a + // lower priority, with overflow entries. + var replace int + for _, overflowEnt := range overflow { + // Do not compare with removed existingWants entry. + for len(removed) != 0 && replace == removed[0] { + replace++ + removed = removed[1:] + } + if overflowEnt.Entry.Priority <= existingWants[replace].Priority { + // Everything in existingWants is equal or more improtant, so this + // overflow entry cannot replace any existing wants. + continue + } + entCid := existingWants[replace].Cid + replace++ + if e.peerLedger.CancelWant(p, entCid) { + e.peerRequestQueue.Remove(entCid, p) + } + e.peerLedger.Wants(p, overflowEnt.Entry, 0) + wants = append(wants, overflowEnt) + } + + return wants +} + // Split the want-havek entries from the cancel and deny entries. func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry, error) { entries := m.Wantlist() // creates copy; safe to modify diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index b9b566a1a..03a806f6f 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -1741,31 +1741,48 @@ func TestWantlistOverflow(t *testing.T) { const limit = 32 - warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) - riga := newTestEngine(ctx, "riga") - if warsaw.Peer == riga.Peer { - t.Fatal("Sanity Check: Peers have same Key!") - } + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) var blockNum int m := message.New(false) for blockNum < limit { - m.AddEntry(blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid(), 1, pb.Message_Wantlist_Block, true) + block := blocks.NewBlock([]byte(fmt.Sprint(blockNum))) + if err := bs.Put(context.Background(), block); err != nil { + t.Fatal(err) + } + m.AddEntry(block.Cid(), 1, pb.Message_Wantlist_Block, true) blockNum++ } - lowPrioCids := make([]cid.Cid, 0, 5) - for blockNum < cap(lowPrioCids) { + + fpt := &fakePeerTagger{} + e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + warsaw := engineSet{ + Peer: peer.ID("warsaw"), + PeerTagger: fpt, + Blockstore: bs, + Engine: e, + } + + //warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) + riga := newTestEngine(ctx, "riga") + if warsaw.Peer == riga.Peer { + t.Fatal("Sanity Check: Peers have same Key!") + } + + lowPrioCids := make([]cid.Cid, 5) + for i := 0; i < cap(lowPrioCids); i++ { c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() blockNum++ m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) - lowPrioCids = append(lowPrioCids, c) + lowPrioCids[i] = c } - highPrioCids := make([]cid.Cid, 0, 5) - for blockNum < cap(highPrioCids) { + highPrioCids := make([]cid.Cid, 5) + for i := 0; i < cap(highPrioCids); i++ { c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() blockNum++ m.AddEntry(c, 10, pb.Message_Wantlist_Block, true) - lowPrioCids = append(highPrioCids, c) + highPrioCids[i] = c } warsaw.Engine.MessageReceived(ctx, riga.Peer, m) @@ -1788,35 +1805,53 @@ func TestWantlistOverflow(t *testing.T) { } } + // These 7 new wants should overflow and 5 ot them should replace existing + // wants that do not have blocks. m = message.New(false) - - lowPrioCids = lowPrioCids[:0] - for blockNum < cap(lowPrioCids) { + blockCids := make([]cid.Cid, 7) + for i := 0; i < cap(blockCids); i++ { c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() blockNum++ m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) - lowPrioCids = append(lowPrioCids, c) + blockCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) } - highPrioCids = highPrioCids[:0] - for blockNum < cap(highPrioCids) { + + var findCount int + for _, c := range blockCids { + if findCid(c, wl) { + findCount++ + } + } + if findCount != len(highPrioCids) { + t.Fatal("expected", len(highPrioCids), "of the new blocks, found", findCount) + } + + // These 7 new wants should overflow and all 7 ot them should replace + // existing wants, 5 that do not have blocks, and 2 that have blocks but + // are lower priority. + m = message.New(false) + for i := 0; i < cap(blockCids); i++ { c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() blockNum++ - m.AddEntry(c, 10, pb.Message_Wantlist_Block, true) - lowPrioCids = append(highPrioCids, c) + m.AddEntry(c, 11, pb.Message_Wantlist_Block, true) + blockCids[i] = c } warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) - // Check that low priority entries not on wantlist. - for _, c := range lowPrioCids { + findCount = 0 + for _, c := range blockCids { if findCid(c, wl) { - t.Fatal("low priority entry should not be on wantlist") + findCount++ } } - // Check that high priority entries are all on wantlist. - for _, c := range highPrioCids { - if !findCid(c, wl) { - t.Fatal("expected high priority entry on wantlist") - } + if findCount != len(blockCids) { + t.Fatal("expected", len(blockCids), "of the new blocks, found", findCount) } } From 3f54359b48d544b1210510eaca95e376fa58ef76 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 4 Jul 2024 01:17:12 -0700 Subject: [PATCH 05/13] Add comment for WithMaxQueuedWantlistEntriesPerPeer --- bitswap/server/internal/decision/engine.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 5a769c552..290f50f4d 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -316,8 +316,11 @@ func WithMaxOutstandingBytesPerPeer(count int) Option { } } -// WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send. -// If a peer send us more than this we will truncate newest entries. +// WithMaxQueuedWantlistEntriesPerPeer limits how many individual entries each +// peer is allowed to send. If a peer sends more than this, then the lowest +// priority entries are truncated to this limit. If there is insufficient space +// to enqueue new entries, then older existing wants with no associated blocks, +// and lower priority wants, are canceled to make room for the new wants. func WithMaxQueuedWantlistEntriesPerPeer(count uint) Option { return func(e *Engine) { e.maxQueuedWantlistEntriesPerPeer = count From 48a42c8bd2c419120005730ba7bf48a01d4749a4 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 5 Jul 2024 23:18:12 -0700 Subject: [PATCH 06/13] changes from review --- bitswap/server/internal/decision/engine.go | 78 +++++++++++-------- .../server/internal/decision/engine_test.go | 3 +- .../server/internal/decision/peer_ledger.go | 14 ++-- out | 0 4 files changed, 56 insertions(+), 39 deletions(-) create mode 100644 out diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 290f50f4d..9633fbde0 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -133,7 +133,7 @@ type PeerEntry struct { // PeerLedger is an external ledger dealing with peers and their want lists. type PeerLedger interface { // Wants informs the ledger that [peer.ID] wants [wl.Entry]. - Wants(p peer.ID, e wl.Entry, limit int) bool + Wants(p peer.ID, e wl.Entry) bool // CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID]. CancelWant(p peer.ID, k cid.Cid) bool @@ -406,7 +406,6 @@ func newEngine( taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, self: self, - peerLedger: NewDefaultPeerLedger(), pendingGauge: bmetrics.PendingEngineGauge(ctx), activeGauge: bmetrics.ActiveEngineGauge(ctx), targetMessageSize: defaultTargetMessageSize, @@ -420,6 +419,11 @@ func newEngine( opt(e) } + // If peerLedger was not set by option, then create a default instance. + if e.peerLedger == nil { + e.peerLedger = NewDefaultPeerLedger(e.maxQueuedWantlistEntriesPerPeer) + } + e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(ctx), bmetrics.ActiveBlocksGauge(ctx)) // default peer task queue options @@ -687,6 +691,20 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return true } + // If there is a possibility of overflow, sort the wantlist to make sure + // the highest priority items are put into the free space. + freeSpace := int(e.maxQueuedWantlistEntriesPerPeer) - e.peerLedger.WantlistSizeForPeer(p) + if len(wants) > freeSpace { + // Sort incoming wants from most to least important. + slices.SortFunc(wants, func(a, b bsmsg.Entry) int { + return cmp.Compare(b.Entry.Priority, a.Entry.Priority) + }) + // Do not take more wants that can be handled. + if len(wants) > int(e.maxQueuedWantlistEntriesPerPeer) { + wants = wants[:int(e.maxQueuedWantlistEntriesPerPeer)] + } + } + // Get block sizes wantKs := cid.NewSet() for _, entry := range wants { @@ -708,7 +726,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap if len(wants) != 0 { filteredWants := wants[:0] // shift inplace for _, entry := range wants { - if !e.peerLedger.Wants(p, entry.Entry, int(e.maxQueuedWantlistEntriesPerPeer)) { + if !e.peerLedger.Wants(p, entry.Entry) { // Cannot add entry because it would exceed size limit. overflow = append(overflow, entry) continue @@ -721,7 +739,8 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } if len(overflow) != 0 { - wants = e.handleOverflow(ctx, p, wants, overflow) + // Overflow is already sorted, so no need to do it here. + wants = e.handleOverflow(ctx, p, overflow, wants) } for _, entry := range cancels { @@ -812,14 +831,17 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // make room by canceling existing wants for which there is no block. If this // does not make sufficient room, then any lower priority wants that have // blocks are canceled. -func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, wants, overflow []bsmsg.Entry) []bsmsg.Entry { +// +// This assumes that overflow is already sorted from most to least important. +// +// Important: handleOverflwo must be called e.lock is locked. +func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, overflow, wants []bsmsg.Entry) []bsmsg.Entry { existingWants := e.peerLedger.WantlistForPeer(p) - // Sort wl and overflow from least to most important. + + // Sort wl from least to most important, to try to replace lowest priority + // items first. slices.SortFunc(existingWants, func(a, b wl.Entry) int { - return cmp.Compare(a.Priority, b.Priority) - }) - slices.SortFunc(overflow, func(a, b bsmsg.Entry) int { - return cmp.Compare(a.Entry.Priority, b.Entry.Priority) + return cmp.Compare(b.Priority, a.Priority) }) queuedWantKs := cid.NewSet() @@ -841,19 +863,20 @@ func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, wants, overflow e.peerRequestQueue.Remove(w.Cid, p) } removed = append(removed, i) - // Add highest priority overflow. - lastOver := overflow[len(overflow)-1] - overflow = overflow[:len(overflow)-1] - e.peerLedger.Wants(p, lastOver.Entry, 0) - wants = append(wants, lastOver) + // Pop hoghest priority overflow. + firstOver := overflow[0] + overflow = overflow[1:] + // Add highest priority overflow to wants. + e.peerLedger.Wants(p, firstOver.Entry) + wants = append(wants, firstOver) if len(overflow) == 0 { - break + return wants } } } - // Not enough dont-haves removed. Replace existing entries, that are a - // lower priority, with overflow entries. + // Replace existing entries, that are a lower priority, with overflow + // entries. var replace int for _, overflowEnt := range overflow { // Do not compare with removed existingWants entry. @@ -861,17 +884,17 @@ func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, wants, overflow replace++ removed = removed[1:] } - if overflowEnt.Entry.Priority <= existingWants[replace].Priority { - // Everything in existingWants is equal or more improtant, so this - // overflow entry cannot replace any existing wants. - continue + if overflowEnt.Entry.Priority < existingWants[replace].Priority { + // All overflow entries have too low of priority to replace any + // existing wants. + break } entCid := existingWants[replace].Cid replace++ if e.peerLedger.CancelWant(p, entCid) { e.peerRequestQueue.Remove(entCid, p) } - e.peerLedger.Wants(p, overflowEnt.Entry, 0) + e.peerLedger.Wants(p, overflowEnt.Entry) wants = append(wants, overflowEnt) } @@ -923,15 +946,6 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([] wants = nil } - // Do not take more wants that can be handled. - if len(wants) > int(e.maxQueuedWantlistEntriesPerPeer) { - // Keep the highest priority wants. - slices.SortFunc(wants, func(a, b bsmsg.Entry) int { - return cmp.Compare(b.Entry.Priority, a.Entry.Priority) - }) - wants = wants[:int(e.maxQueuedWantlistEntriesPerPeer)] - } - // Clear truncated entries. clear(entries[len(wants):]) diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index 03a806f6f..1d0a02080 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -1764,7 +1764,6 @@ func TestWantlistOverflow(t *testing.T) { Engine: e, } - //warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) riga := newTestEngine(ctx, "riga") if warsaw.Peer == riga.Peer { t.Fatal("Sanity Check: Peers have same Key!") @@ -1805,7 +1804,7 @@ func TestWantlistOverflow(t *testing.T) { } } - // These 7 new wants should overflow and 5 ot them should replace existing + // These 7 new wants should overflow and 5 of them should replace existing // wants that do not have blocks. m = message.New(false) blockCids := make([]cid.Cid, 7) diff --git a/bitswap/server/internal/decision/peer_ledger.go b/bitswap/server/internal/decision/peer_ledger.go index 814879d97..227e50de1 100644 --- a/bitswap/server/internal/decision/peer_ledger.go +++ b/bitswap/server/internal/decision/peer_ledger.go @@ -12,24 +12,28 @@ type DefaultPeerLedger struct { // these two maps are inversions of each other peers map[peer.ID]map[cid.Cid]entry cids map[cid.Cid]map[peer.ID]entry + // value 0 mean no limit + maxEntriesPerPeer int } -func NewDefaultPeerLedger() *DefaultPeerLedger { +func NewDefaultPeerLedger(maxEntriesPerPeer uint) *DefaultPeerLedger { return &DefaultPeerLedger{ peers: make(map[peer.ID]map[cid.Cid]entry), cids: make(map[cid.Cid]map[peer.ID]entry), + + maxEntriesPerPeer: int(maxEntriesPerPeer), } } // Wants adds an entry to the peer ledger. If adding the entry would make the -// peer ledger exceed the size limit, then the entry is not added and false is -// returned. A limit of zero is ignored. -func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry, limit int) bool { +// peer ledger exceed the maxEntriesPerPeer limit, then the entry is not added +// and false is returned. +func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) bool { cids, ok := l.peers[p] if !ok { cids = make(map[cid.Cid]entry) l.peers[p] = cids - } else if limit != 0 && len(cids) == limit { + } else if l.maxEntriesPerPeer != 0 && len(cids) == l.maxEntriesPerPeer { if _, ok = cids[e.Cid]; !ok { return false // cannot add to peer ledger } diff --git a/out b/out new file mode 100644 index 000000000..e69de29bb From 7c11036348299f873254872783701aabcb8ad325 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Sat, 6 Jul 2024 04:09:00 -0700 Subject: [PATCH 07/13] Fix race, do not sort wants unles overflow, and truncate over-limit wants --- bitswap/server/internal/decision/engine.go | 27 ++---- .../server/internal/decision/engine_test.go | 93 +++++++++++-------- 2 files changed, 65 insertions(+), 55 deletions(-) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 9633fbde0..d6257c874 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -691,18 +691,9 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return true } - // If there is a possibility of overflow, sort the wantlist to make sure - // the highest priority items are put into the free space. - freeSpace := int(e.maxQueuedWantlistEntriesPerPeer) - e.peerLedger.WantlistSizeForPeer(p) - if len(wants) > freeSpace { - // Sort incoming wants from most to least important. - slices.SortFunc(wants, func(a, b bsmsg.Entry) int { - return cmp.Compare(b.Entry.Priority, a.Entry.Priority) - }) - // Do not take more wants that can be handled. - if len(wants) > int(e.maxQueuedWantlistEntriesPerPeer) { - wants = wants[:int(e.maxQueuedWantlistEntriesPerPeer)] - } + // Do not take more wants that can be handled. + if len(wants) > int(e.maxQueuedWantlistEntriesPerPeer) { + wants = wants[:int(e.maxQueuedWantlistEntriesPerPeer)] } // Get block sizes @@ -739,7 +730,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } if len(overflow) != 0 { - // Overflow is already sorted, so no need to do it here. wants = e.handleOverflow(ctx, p, overflow, wants) } @@ -832,14 +822,15 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // does not make sufficient room, then any lower priority wants that have // blocks are canceled. // -// This assumes that overflow is already sorted from most to least important. -// // Important: handleOverflwo must be called e.lock is locked. func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, overflow, wants []bsmsg.Entry) []bsmsg.Entry { + // Sort overflow from most to least important. + slices.SortFunc(overflow, func(a, b bsmsg.Entry) int { + return cmp.Compare(b.Entry.Priority, a.Entry.Priority) + }) + // Sort existing wants from least to most important, to try to replace + // lowest priority items first. existingWants := e.peerLedger.WantlistForPeer(p) - - // Sort wl from least to most important, to try to replace lowest priority - // items first. slices.SortFunc(existingWants, func(a, b wl.Entry) int { return cmp.Compare(b.Priority, a.Priority) }) diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index 1d0a02080..d644dfdec 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -1743,14 +1743,18 @@ func TestWantlistOverflow(t *testing.T) { bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + origCids := make([]cid.Cid, limit) var blockNum int m := message.New(false) for blockNum < limit { block := blocks.NewBlock([]byte(fmt.Sprint(blockNum))) - if err := bs.Put(context.Background(), block); err != nil { - t.Fatal(err) + if blockNum != 0 { // do not put first block in blockstore. + if err := bs.Put(context.Background(), block); err != nil { + t.Fatal(err) + } } m.AddEntry(block.Cid(), 1, pb.Message_Wantlist_Block, true) + origCids[blockNum] = block.Cid() blockNum++ } @@ -1763,12 +1767,20 @@ func TestWantlistOverflow(t *testing.T) { Blockstore: bs, Engine: e, } - riga := newTestEngine(ctx, "riga") if warsaw.Peer == riga.Peer { t.Fatal("Sanity Check: Peers have same Key!") } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + // Check that the wantlist is at the size limit. + wl := warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + t.Log("Senr message with", limit, "medium-priority wants and", limit-1, "have blocks present") + + m = message.New(false) lowPrioCids := make([]cid.Cid, 5) for i := 0; i < cap(lowPrioCids); i++ { c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() @@ -1776,6 +1788,25 @@ func TestWantlistOverflow(t *testing.T) { m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) lowPrioCids[i] = c } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + // Check that one low priority entry is on the wantlist, since there is one + // existing entry without a blocks and none at a lower priority. + var count int + for _, c := range lowPrioCids { + if findCid(c, wl) { + count++ + } + } + if count != 1 { + t.Fatal("Expected 1 low priority entry on wantlist, found", count) + } + t.Log("Sent message with", len(lowPrioCids), "low-priority wants. One accepted as replacement for existig want without block.") + + m = message.New(false) highPrioCids := make([]cid.Cid, 5) for i := 0; i < cap(highPrioCids); i++ { c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() @@ -1784,30 +1815,24 @@ func TestWantlistOverflow(t *testing.T) { highPrioCids[i] = c } warsaw.Engine.MessageReceived(ctx, riga.Peer, m) - - // Check that the wantlist is at the size limit. - wl := warsaw.Engine.WantlistForPeer(riga.Peer) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) if len(wl) != limit { t.Fatal("wantlist size", len(wl), "does not match limit", limit) } - - // Check that low priority entries not on wantlist. - for _, c := range lowPrioCids { - if findCid(c, wl) { - t.Fatal("low priority entry should not be on wantlist") - } - } - // Check that high priority entries are all on wantlist. + // Check that all high priority entries are all on wantlist, since there + // were existing entries with lower priority. for _, c := range highPrioCids { if !findCid(c, wl) { t.Fatal("expected high priority entry on wantlist") } } + t.Log("Sent message with", len(highPrioCids), "high-priority wants. All accepted replacing wants without block or low priority.") - // These 7 new wants should overflow and 5 of them should replace existing - // wants that do not have blocks. + // These new wants should overflow and some of them should replace existing + // wants that do not have blocks (the high-priority weants from the + // previous message). m = message.New(false) - blockCids := make([]cid.Cid, 7) + blockCids := make([]cid.Cid, len(highPrioCids)+2) for i := 0; i < cap(blockCids); i++ { c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() blockNum++ @@ -1820,38 +1845,32 @@ func TestWantlistOverflow(t *testing.T) { t.Fatal("wantlist size", len(wl), "does not match limit", limit) } - var findCount int + count = 0 for _, c := range blockCids { if findCid(c, wl) { - findCount++ + count++ } } - if findCount != len(highPrioCids) { - t.Fatal("expected", len(highPrioCids), "of the new blocks, found", findCount) + if count != len(highPrioCids) { + t.Fatal("expected", len(highPrioCids), "of the new blocks, found", count) } + t.Log("Sent message with", len(blockCids), "low-priority wants.", count, "accepted replacing wants without blocks from previous message") - // These 7 new wants should overflow and all 7 ot them should replace - // existing wants, 5 that do not have blocks, and 2 that have blocks but - // are lower priority. + // Send the original wants. Some should replace the existing wants that do + // not have blocks associated, and the rest should overwrite the existing + // ones. m = message.New(false) - for i := 0; i < cap(blockCids); i++ { - c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() - blockNum++ - m.AddEntry(c, 11, pb.Message_Wantlist_Block, true) - blockCids[i] = c + for _, c := range origCids { + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) } warsaw.Engine.MessageReceived(ctx, riga.Peer, m) wl = warsaw.Engine.WantlistForPeer(riga.Peer) - - findCount = 0 - for _, c := range blockCids { - if findCid(c, wl) { - findCount++ + for _, c := range origCids { + if !findCid(c, wl) { + t.Fatal("missing low-priority original wants to overwrite existing") } } - if findCount != len(blockCids) { - t.Fatal("expected", len(blockCids), "of the new blocks, found", findCount) - } + t.Log("Sent message with", len(origCids), "original wants at low priority. All accepted overwriting existing wants.") } func findCid(c cid.Cid, wantList []wl.Entry) bool { From 994bd46e3801b37e5b482803c487136dd413e363 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 8 Jul 2024 09:54:57 -0700 Subject: [PATCH 08/13] Add logging about overflow handling --- bitswap/server/internal/decision/engine.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index d6257c874..f5ff8b12b 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -691,11 +691,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return true } - // Do not take more wants that can be handled. - if len(wants) > int(e.maxQueuedWantlistEntriesPerPeer) { - wants = wants[:int(e.maxQueuedWantlistEntriesPerPeer)] - } - // Get block sizes wantKs := cid.NewSet() for _, entry := range wants { @@ -730,6 +725,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } if len(overflow) != 0 { + log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow)) wants = e.handleOverflow(ctx, p, overflow, wants) } @@ -930,7 +926,10 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([] continue } - wants = append(wants, et) + // Do not take more wants that can be handled. + if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) { + wants = append(wants, et) + } } if len(wants) == 0 { From 50421b4dbc791d1c2505b7f1972dd486c9fea2f9 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 8 Jul 2024 11:39:07 -0700 Subject: [PATCH 09/13] Add test to issustrate failure prior to this PR, and fix afterward --- .../server/internal/decision/engine_test.go | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index d644dfdec..a9728d5a3 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -1735,6 +1735,91 @@ func TestKillConnectionForInlineCid(t *testing.T) { } } +func TestWantlistBlocked(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const limit = 32 + + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + + // Generate a set of blocks that the server has. + haveCids := make([]cid.Cid, limit) + var blockNum int + for blockNum < limit { + block := blocks.NewBlock([]byte(fmt.Sprint(blockNum))) + if blockNum != 0 { // do not put first block in blockstore. + if err := bs.Put(context.Background(), block); err != nil { + t.Fatal(err) + } + } + haveCids[blockNum] = block.Cid() + blockNum++ + } + + fpt := &fakePeerTagger{} + e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + warsaw := engineSet{ + Peer: peer.ID("warsaw"), + PeerTagger: fpt, + Blockstore: bs, + Engine: e, + } + riga := newTestEngine(ctx, "riga") + if warsaw.Peer == riga.Peer { + t.Fatal("Sanity Check: Peers have same Key!") + } + + m := message.New(false) + dontHaveCids := make([]cid.Cid, limit) + for i := 0; i < limit; i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) + dontHaveCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl := warsaw.Engine.WantlistForPeer(riga.Peer) + // Check that all the dontHave wants are on the wantlist. + for _, c := range dontHaveCids { + if !findCid(c, wl) { + t.Fatal("Expected all dontHaveCids to be on wantlist") + } + } + t.Log("All", len(wl), "dont-have CIDs are on wantlist") + + m = message.New(false) + for _, c := range haveCids { + m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + // Check that all the dontHave wants are on the wantlist. + for _, c := range haveCids { + if !findCid(c, wl) { + t.Fatal("Missing expected want. Expected all haveCids to be on wantlist") + } + } + t.Log("All", len(wl), "new have CIDs are now on wantlist") + + m = message.New(false) + for i := 0; i < limit; i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) + dontHaveCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + // Check that all the new dontHave wants are not on the wantlist. + for _, c := range dontHaveCids { + if findCid(c, wl) { + t.Fatal("No new dontHaveCids should be on wantlist") + } + } + t.Log("All", len(wl), "new dont-have CIDs are not on wantlist") +} + func TestWantlistOverflow(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From ffd64ac5ff0846e8476efdbe7ce62bbbd37b2ccf Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 8 Jul 2024 11:58:50 -0700 Subject: [PATCH 10/13] Longer timeout for slow CI --- bitswap/client/internal/messagequeue/messagequeue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bitswap/client/internal/messagequeue/messagequeue_test.go b/bitswap/client/internal/messagequeue/messagequeue_test.go index 3f6a2f622..e9b8f7c54 100644 --- a/bitswap/client/internal/messagequeue/messagequeue_test.go +++ b/bitswap/client/internal/messagequeue/messagequeue_test.go @@ -19,7 +19,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) -const collectTimeout = 100 * time.Millisecond +const collectTimeout = 200 * time.Millisecond type fakeMessageNetwork struct { connectError error From 8664e385205ccc749dd4113d581a0e5889cf2f7f Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Mon, 29 Jul 2024 09:13:19 -0700 Subject: [PATCH 11/13] Update bitswap/server/internal/decision/engine_test.go Co-authored-by: Marcin Rataj --- bitswap/server/internal/decision/engine_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index a9728d5a3..b83342302 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -1863,7 +1863,7 @@ func TestWantlistOverflow(t *testing.T) { if len(wl) != limit { t.Fatal("wantlist size", len(wl), "does not match limit", limit) } - t.Log("Senr message with", limit, "medium-priority wants and", limit-1, "have blocks present") + t.Log("Sent message with", limit, "medium-priority wants and", limit-1, "have blocks present") m = message.New(false) lowPrioCids := make([]cid.Cid, 5) From 0d1f8e1d6ad1f8ed728292082a9ae6c0d37ac8f3 Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Mon, 29 Jul 2024 09:13:38 -0700 Subject: [PATCH 12/13] Update bitswap/server/internal/decision/engine.go Co-authored-by: Marcin Rataj --- bitswap/server/internal/decision/engine.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 55286f7ef..a40345d8f 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -133,6 +133,8 @@ type PeerEntry struct { // PeerLedger is an external ledger dealing with peers and their want lists. type PeerLedger interface { // Wants informs the ledger that [peer.ID] wants [wl.Entry]. + // If peer ledger exceed internal limit, then the entry is not added + // and false is returned. Wants(p peer.ID, e wl.Entry) bool // CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID]. From b0163277098a21fdaac9f9545e55dff20edb631e Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Mon, 29 Jul 2024 09:13:52 -0700 Subject: [PATCH 13/13] Update CHANGELOG.md Co-authored-by: Marcin Rataj --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe6eab246..8a099ceef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,7 +44,7 @@ The following emojis are used to highlight certain changes: - `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found - `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup -- bitswap wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. +- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. ## [v0.20.0]