Skip to content

Commit

Permalink
add peer rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Liuhaai committed Feb 2, 2025
1 parent 489210d commit 64f0241
Showing 1 changed file with 70 additions and 6 deletions.
76 changes: 70 additions & 6 deletions p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
yamux "github.com/libp2p/go-libp2p/p2p/muxer/yamux"
connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
Expand Down Expand Up @@ -69,6 +70,8 @@ type (
MaxPeer int `yaml:"maxPeer"`
MaxMessageSize int `yaml:"maxMessageSize"`
BlacklistTolerance int `yaml:"blacklistTolerance"`
MaxConn int `yaml:"maxConn"`
MaxStream int `yaml:"maxStream"`
}

// RateLimitConfig all numbers are per second value.
Expand Down Expand Up @@ -259,6 +262,17 @@ func WithMaxMessageSize(size int) Option {
}
}

func WithMaxConnAndStream(conn, stream uint32) Option {
return func(cfg *Config) error {
if conn == 0 || stream == 0 {
panic("conn and stream should be greater than 0")
}
cfg.MaxConn = int(conn)
cfg.MaxStream = int(stream)
return nil
}
}

// Host is the main struct that represents a host that communicating with the rest of the P2P networks
type Host struct {
host core.Host
Expand All @@ -277,6 +291,30 @@ type Host struct {
peerManager *peerManager
}

var (
_count = uint64(0)
)

func p2pMessageInspector(h *Host) func(peerID peer.ID, msg *pubsub.RPC) error {
return func(peerID peer.ID, msg *pubsub.RPC) error {
allowed, err := h.allowSource(peerID)
if err != nil {
Logger().Error("Error when checking if the source is allowed.", zap.Error(err))
return err
}
if !allowed {
_count = (_count + 1) % 1000
if _count == 0 {
Logger().Warn("message from p2p peer hit rate limit", zap.Any("peer id", peerID))
}
// h.peerManager.TryBlockPeer(peerID)
return errors.New("drop message")
}

return nil
}
}

// NewHost constructs a host struct
func NewHost(ctx context.Context, options ...Option) (*Host, error) {
cfg := DefaultConfig
Expand Down Expand Up @@ -343,6 +381,31 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) {
libp2p.DefaultPrometheusRegisterer,
}

if cfg.MaxConn > 0 {
// Set limits for libp2p services
scalingLimits := rcmgr.DefaultLimits
libp2p.SetDefaultServiceLimits(&scalingLimits)
scaledDefaultLimits := scalingLimits.AutoScale()
rcmgrCfg := rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{
Conns: rcmgr.LimitVal(2 * cfg.MaxConn),
ConnsInbound: rcmgr.LimitVal(cfg.MaxConn),
ConnsOutbound: rcmgr.LimitVal(cfg.MaxConn),
Streams: rcmgr.LimitVal(2 * cfg.MaxStream),
StreamsInbound: rcmgr.LimitVal(cfg.MaxStream),
StreamsOutbound: rcmgr.LimitVal(cfg.MaxStream),
},
}
rm, err := rcmgr.NewResourceManager(
rcmgr.NewFixedLimiter(rcmgrCfg.Build(scaledDefaultLimits)),
rcmgr.WithMetricsDisabled(),
)
if err != nil {
return nil, err
}
opts = append(opts, libp2p.ResourceManager(rm))
}

if !cfg.SecureIO {
opts = append(opts, libp2p.NoSecurity)
}
Expand Down Expand Up @@ -397,17 +460,12 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) {
if err != nil {
return nil, err
}
ps, err := newPubSub(ctx, host, pubsub.WithBlacklist(blacklist), pubsub.WithMaxMessageSize(cfg.MaxMessageSize))
if err != nil {
return nil, err
}
myHost := Host{
host: host,
cfg: cfg,
topics: make(map[string]bool),
kad: kad,
kadKey: cid,
pubsub: ps,
pubs: make(map[string]*pubsub.Topic),
blacklist: blacklist,
subs: make(map[string]*pubsub.Subscription),
Expand All @@ -418,7 +476,13 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) {
peerManager: newPeerManager(host, routing.NewRoutingDiscovery(kad), cfg.GroupID,
withMaxPeers(cfg.MaxPeer), withBlacklistTolerance(cfg.BlacklistTolerance), withBlacklistTimeout(cfg.BlackListTimeout)),
}

myHost.pubsub, err = newPubSub(ctx, host,
pubsub.WithBlacklist(blacklist),
pubsub.WithMaxMessageSize(cfg.MaxMessageSize),
pubsub.WithAppSpecificRpcInspector(p2pMessageInspector(&myHost)))
if err != nil {
return nil, err
}
addrs := make([]string, 0)
for _, ma := range myHost.Addresses() {
addrs = append(addrs, ma.String())
Expand Down

0 comments on commit 64f0241

Please sign in to comment.