Skip to content

Commit

Permalink
Add non-blocking to p2p event streams
Browse files Browse the repository at this point in the history
  • Loading branch information
zivkovicmilos committed Jan 7, 2025
1 parent f245127 commit b8cc3cf
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
9 changes: 6 additions & 3 deletions tm2/pkg/internal/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,13 @@ func MakeConnectedPeers(
multiplexSwitch.DialPeers(addrs...)

// Set up an exit timer
timer := time.NewTimer(5 * time.Second)
timer := time.NewTimer(1 * time.Minute)
defer timer.Stop()

connectedPeers := make(map[p2pTypes.ID]struct{})
var (
connectedPeers = make(map[p2pTypes.ID]struct{})
targetPeers = cfg.Count - 1
)

for {
select {
Expand All @@ -143,7 +146,7 @@ func MakeConnectedPeers(

connectedPeers[ev.PeerID] = struct{}{}

if len(connectedPeers) == cfg.Count-1 {
if len(connectedPeers) == targetPeers {
return nil
}
case <-timer.C:
Expand Down
12 changes: 10 additions & 2 deletions tm2/pkg/p2p/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ type (
func (s *subscriptions) add(filterFn EventFilter) (string, chan Event) {
var (
id = xid.New().String()
ch = make(chan Event, 1)
// Since the event stream is non-blocking,
// the event buffer should be sufficiently
// large for most use-cases. Subscribers can
// handle large event load caller-side to mitigate
// events potentially being missed
ch = make(chan Event, 100)
)

(*s)[id] = subscription{
Expand Down Expand Up @@ -99,6 +104,9 @@ func (s *subscriptions) notify(event Event) {
continue
}

sub.ch <- event
select {
case sub.ch <- event:
default: // non-blocking
}
}
}

0 comments on commit b8cc3cf

Please sign in to comment.