-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement Chain Exchange protocol over pubsub
Implement chain exchange protocol over pubsub as a mechanism to propagate `ECChain` across the network with reasonable spam protection. To protect against spam the mechanism employs two separate caches for chains that are generally discovered across the network and the ones explicitly looked up or broadcasted by the local node. Both caches are capped LRU, where the LRU recent-ness is used as a way to prioritise chains we cache while keeping the total memory footprint fixed. This approach is not the most memory efficient but is simpler to implement as the LRU encapsulates a lot of the complexity. The code has a lot of TODOs as places to improve or question to the reviewer. To action most of the TODOs further refactoring across the code is needed which is intended to be actioned in separate commits. The code path introduced here is not integrated into F3 host; future PRs will iteratively integrate the mechanism across F3 host and other places. Part of #792
- Loading branch information
Showing
7 changed files
with
675 additions
and
0 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package chainexchange | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/filecoin-project/go-f3/gpbft" | ||
) | ||
|
||
type Key []byte | ||
|
||
type Keyer interface { | ||
Key(gpbft.ECChain) Key | ||
} | ||
|
||
type Message struct { | ||
Instance uint64 | ||
Chain gpbft.ECChain | ||
} | ||
|
||
type ChainExchange interface { | ||
Keyer | ||
Broadcast(context.Context, Message) error | ||
GetChainByInstance(context.Context, uint64, Key) (gpbft.ECChain, bool) | ||
RemoveChainsByInstance(context.Context, uint64) error | ||
} | ||
|
||
func (k Key) IsZero() bool { return len(k) == 0 } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
package chainexchange | ||
|
||
import ( | ||
"errors" | ||
|
||
"github.com/filecoin-project/go-f3/gpbft" | ||
"github.com/filecoin-project/go-f3/internal/psutil" | ||
"github.com/filecoin-project/go-f3/manifest" | ||
pubsub "github.com/libp2p/go-libp2p-pubsub" | ||
) | ||
|
||
type Option func(*options) error | ||
|
||
type options struct { | ||
topicName string | ||
topicScoreParams *pubsub.TopicScoreParams | ||
subscriptionBufferSize int | ||
pubsub *pubsub.PubSub | ||
progress gpbft.Progress | ||
maxChainLength int | ||
maxInstanceLookahead uint64 | ||
maxDiscoveredChainsPerInstance int | ||
maxWantedChainsPerInstance int | ||
} | ||
|
||
func newOptions(o ...Option) (*options, error) { | ||
opts := &options{ | ||
topicScoreParams: psutil.PubsubTopicScoreParams, | ||
subscriptionBufferSize: 32, | ||
maxChainLength: gpbft.ChainMaxLen, | ||
maxInstanceLookahead: manifest.DefaultCommitteeLookback, | ||
maxDiscoveredChainsPerInstance: 1000, | ||
maxWantedChainsPerInstance: 1000, | ||
} | ||
for _, apply := range o { | ||
if err := apply(opts); err != nil { | ||
return nil, err | ||
} | ||
} | ||
if opts.progress == nil { | ||
return nil, errors.New("gpbft progress must be set") | ||
} | ||
if opts.pubsub == nil { | ||
return nil, errors.New("pubsub must be set") | ||
} | ||
if opts.topicName == "" { | ||
return nil, errors.New("topic name must be set") | ||
} | ||
return opts, nil | ||
} | ||
|
||
func WithTopicName(name string) Option { | ||
return func(o *options) error { | ||
if name == "" { | ||
return errors.New("topic name cannot be empty") | ||
} | ||
o.topicName = name | ||
return nil | ||
} | ||
} | ||
|
||
func WithTopicScoreParams(params *pubsub.TopicScoreParams) Option { | ||
return func(o *options) error { | ||
o.topicScoreParams = params | ||
return nil | ||
} | ||
} | ||
|
||
func WithSubscriptionBufferSize(size int) Option { | ||
return func(o *options) error { | ||
if size < 1 { | ||
return errors.New("subscription buffer size must be at least 1") | ||
} | ||
o.subscriptionBufferSize = size | ||
return nil | ||
} | ||
} | ||
|
||
func WithPubSub(pubsub *pubsub.PubSub) Option { | ||
return func(o *options) error { | ||
if pubsub == nil { | ||
return errors.New("pubsub cannot be nil") | ||
} | ||
o.pubsub = pubsub | ||
return nil | ||
} | ||
} | ||
|
||
func WithProgress(progress gpbft.Progress) Option { | ||
return func(o *options) error { | ||
if progress == nil { | ||
return errors.New("progress cannot be nil") | ||
} | ||
o.progress = progress | ||
return nil | ||
} | ||
} | ||
|
||
func WithMaxChainLength(length int) Option { | ||
return func(o *options) error { | ||
if length < 1 { | ||
return errors.New("max chain length must be at least 1") | ||
} | ||
o.maxChainLength = length | ||
return nil | ||
} | ||
} | ||
|
||
func WithMaxInstanceLookahead(lookahead uint64) Option { | ||
return func(o *options) error { | ||
o.maxInstanceLookahead = lookahead | ||
return nil | ||
} | ||
} | ||
|
||
func WithMaxDiscoveredChainsPerInstance(max int) Option { | ||
return func(o *options) error { | ||
if max < 1 { | ||
return errors.New("max discovered chains per instance must be at least 1") | ||
} | ||
o.maxDiscoveredChainsPerInstance = max | ||
return nil | ||
} | ||
} | ||
|
||
func WithMaxWantedChainsPerInstance(max int) Option { | ||
return func(o *options) error { | ||
if max < 1 { | ||
return errors.New("max wanted chains per instance must be at least 1") | ||
} | ||
o.maxWantedChainsPerInstance = max | ||
return nil | ||
} | ||
} |
Oops, something went wrong.