From b8cc3cf40b2c9d8f1044a29ef6a9049c9cb17af2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milo=C5=A1=20=C5=BDivkovi=C4=87?= Date: Tue, 7 Jan 2025 16:38:35 +0100 Subject: [PATCH] Add non-blocking to p2p event streams --- tm2/pkg/internal/p2p/p2p.go | 9 ++++++--- tm2/pkg/p2p/events/events.go | 12 ++++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/tm2/pkg/internal/p2p/p2p.go b/tm2/pkg/internal/p2p/p2p.go index 2a16646a159..1e650e0cd25 100644 --- a/tm2/pkg/internal/p2p/p2p.go +++ b/tm2/pkg/internal/p2p/p2p.go @@ -131,10 +131,13 @@ func MakeConnectedPeers( multiplexSwitch.DialPeers(addrs...) // Set up an exit timer - timer := time.NewTimer(5 * time.Second) + timer := time.NewTimer(1 * time.Minute) defer timer.Stop() - connectedPeers := make(map[p2pTypes.ID]struct{}) + var ( + connectedPeers = make(map[p2pTypes.ID]struct{}) + targetPeers = cfg.Count - 1 + ) for { select { @@ -143,7 +146,7 @@ func MakeConnectedPeers( connectedPeers[ev.PeerID] = struct{}{} - if len(connectedPeers) == cfg.Count-1 { + if len(connectedPeers) == targetPeers { return nil } case <-timer.C: diff --git a/tm2/pkg/p2p/events/events.go b/tm2/pkg/p2p/events/events.go index bf76e27d91e..1eb4699fb45 100644 --- a/tm2/pkg/p2p/events/events.go +++ b/tm2/pkg/p2p/events/events.go @@ -68,7 +68,12 @@ type ( func (s *subscriptions) add(filterFn EventFilter) (string, chan Event) { var ( id = xid.New().String() - ch = make(chan Event, 1) + // Since the event stream is non-blocking, + // the event buffer should be sufficiently + // large for most use-cases. Subscribers can + // handle large event load caller-side to mitigate + // events potentially being missed + ch = make(chan Event, 100) ) (*s)[id] = subscription{ @@ -99,6 +104,9 @@ func (s *subscriptions) notify(event Event) { continue } - sub.ch <- event + select { + case sub.ch <- event: + default: // non-blocking + } } }