diff --git a/consensus_inputs.go b/consensus_inputs.go new file mode 100644 index 00000000..f557f68b --- /dev/null +++ b/consensus_inputs.go @@ -0,0 +1,230 @@ +package f3 + +import ( + "bytes" + "context" + "fmt" + "slices" + "time" + + "github.com/filecoin-project/go-f3/certs" + "github.com/filecoin-project/go-f3/certstore" + "github.com/filecoin-project/go-f3/ec" + "github.com/filecoin-project/go-f3/gpbft" + "github.com/filecoin-project/go-f3/internal/clock" + "github.com/filecoin-project/go-f3/manifest" + "go.opentelemetry.io/otel/metric" +) + +type gpbftInputs struct { + manifest *manifest.Manifest + certStore *certstore.Store + ec ec.Backend + verifier gpbft.Verifier + clock clock.Clock +} + +//ptCache lru.Cache[string, cid.Cid] +//} +//func (h *gpbftInputs) getPowerTableCIDForTipset(ctx context.Context, tsk gpbft.TipSetKey) { + +//} + +func (h *gpbftInputs) collectChain(ctx context.Context, base ec.TipSet, head ec.TipSet) ([]ec.TipSet, error) { + // TODO: optimize when head is way beyond base + res := make([]ec.TipSet, 0, 2*gpbft.ChainMaxLen) + res = append(res, head) + + current := head + for !bytes.Equal(current.Key(), base.Key()) { + if current.Epoch() < base.Epoch() { + metrics.headDiverged.Add(ctx, 1) + log.Infow("reorg-ed away from base, proposing just base", + "head", head.String(), "base", base.String()) + return nil, nil + } + var err error + current, err = h.ec.GetParent(ctx, current) + if err != nil { + return nil, fmt.Errorf("walking back the chain: %w", err) + } + res = append(res, current) + } + slices.Reverse(res) + return res[1:], nil +} + +// Returns inputs to the next GPBFT instance. +// These are: +// - the supplemental data. +// - the EC chain to propose. +// These will be used as input to a subsequent instance of the protocol. +// The chain should be a suffix of the last chain notified to the host via +// ReceiveDecision (or known to be final via some other channel). +func (h *gpbftInputs) GetProposal(ctx context.Context, instance uint64) (_ *gpbft.SupplementalData, _ gpbft.ECChain, _err error) { + defer func(start time.Time) { + metrics.proposalFetchTime.Record(context.TODO(), time.Since(start).Seconds(), metric.WithAttributes(attrStatusFromErr(_err))) + }(time.Now()) + + var baseTsk gpbft.TipSetKey + if instance == h.manifest.InitialInstance { + ts, err := h.ec.GetTipsetByEpoch(ctx, + h.manifest.BootstrapEpoch-h.manifest.EC.Finality) + if err != nil { + return nil, nil, fmt.Errorf("getting boostrap base: %w", err) + } + baseTsk = ts.Key() + } else { + cert, err := h.certStore.Get(ctx, instance-1) + if err != nil { + return nil, nil, fmt.Errorf("getting cert for previous instance(%d): %w", instance-1, err) + } + baseTsk = cert.ECChain.Head().Key + } + + baseTs, err := h.ec.GetTipset(ctx, baseTsk) + if err != nil { + return nil, nil, fmt.Errorf("getting base TS: %w", err) + } + headTs, err := h.ec.GetHead(ctx) + if err != nil { + return nil, nil, fmt.Errorf("getting head TS: %w", err) + } + + collectedChain, err := h.collectChain(ctx, baseTs, headTs) + if err != nil { + return nil, nil, fmt.Errorf("collecting chain: %w", err) + } + + // If we have an explicit head-lookback, trim the chain. + if h.manifest.EC.HeadLookback > 0 { + collectedChain = collectedChain[:max(0, len(collectedChain)-h.manifest.EC.HeadLookback)] + } + + // less than ECPeriod since production of the head agreement is unlikely, trim the chain. + if len(collectedChain) > 0 && h.clock.Since(collectedChain[len(collectedChain)-1].Timestamp()) < h.manifest.EC.Period { + collectedChain = collectedChain[:len(collectedChain)-1] + } + + base := gpbft.TipSet{ + Epoch: baseTs.Epoch(), + Key: baseTs.Key(), + } + pte, err := h.ec.GetPowerTable(ctx, baseTs.Key()) + if err != nil { + return nil, nil, fmt.Errorf("getting power table for base: %w", err) + } + base.PowerTable, err = certs.MakePowerTableCID(pte) + if err != nil { + return nil, nil, fmt.Errorf("computing powertable CID for base: %w", err) + } + + suffix := make([]gpbft.TipSet, min(gpbft.ChainMaxLen-1, len(collectedChain))) // -1 because of base + for i := range suffix { + suffix[i].Key = collectedChain[i].Key() + suffix[i].Epoch = collectedChain[i].Epoch() + + pte, err = h.ec.GetPowerTable(ctx, suffix[i].Key) + if err != nil { + return nil, nil, fmt.Errorf("getting power table for suffix %d: %w", i, err) + } + suffix[i].PowerTable, err = certs.MakePowerTableCID(pte) + if err != nil { + return nil, nil, fmt.Errorf("computing powertable CID for base: %w", err) + } + } + chain, err := gpbft.NewChain(base, suffix...) + if err != nil { + return nil, nil, fmt.Errorf("making new chain: %w", err) + } + + var supplData gpbft.SupplementalData + committee, err := h.GetCommittee(ctx, instance+1) + if err != nil { + return nil, nil, fmt.Errorf("getting commite for %d: %w", instance+1, err) + } + + supplData.PowerTable, err = certs.MakePowerTableCID(committee.PowerTable.Entries) + if err != nil { + return nil, nil, fmt.Errorf("making power table cid for supplemental data: %w", err) + } + + return &supplData, chain, nil +} + +func (h *gpbftInputs) GetCommittee(ctx context.Context, instance uint64) (_ *gpbft.Committee, _err error) { + defer func(start time.Time) { + metrics.committeeFetchTime.Record(context.TODO(), time.Since(start).Seconds(), metric.WithAttributes(attrStatusFromErr(_err))) + }(time.Now()) + + var powerTsk gpbft.TipSetKey + var powerEntries gpbft.PowerEntries + var err error + + if instance < h.manifest.InitialInstance+h.manifest.CommitteeLookback { + //boostrap phase + powerEntries, err = h.certStore.GetPowerTable(ctx, h.manifest.InitialInstance) + if err != nil { + return nil, fmt.Errorf("getting power table: %w", err) + } + if h.certStore.Latest() == nil { + ts, err := h.ec.GetTipsetByEpoch(ctx, h.manifest.BootstrapEpoch-h.manifest.EC.Finality) + if err != nil { + return nil, fmt.Errorf("getting tipset for boostrap epoch with lookback: %w", err) + } + powerTsk = ts.Key() + } else { + cert, err := h.certStore.Get(ctx, h.manifest.InitialInstance) + if err != nil { + return nil, fmt.Errorf("getting finality certificate: %w", err) + } + powerTsk = cert.ECChain.Base().Key + } + } else { + cert, err := h.certStore.Get(ctx, instance-h.manifest.CommitteeLookback) + if err != nil { + return nil, fmt.Errorf("getting finality certificate: %w", err) + } + powerTsk = cert.ECChain.Head().Key + + powerEntries, err = h.certStore.GetPowerTable(ctx, instance) + if err != nil { + log.Debugf("failed getting power table from certstore: %v, falling back to EC", err) + + powerEntries, err = h.ec.GetPowerTable(ctx, powerTsk) + if err != nil { + return nil, fmt.Errorf("getting power table: %w", err) + } + } + } + + ts, err := h.ec.GetTipset(ctx, powerTsk) + if err != nil { + return nil, fmt.Errorf("getting tipset: %w", err) + } + + table := gpbft.NewPowerTable() + if err := table.Add(powerEntries...); err != nil { + return nil, fmt.Errorf("adding entries to power table: %w", err) + } + if err := table.Validate(); err != nil { + return nil, fmt.Errorf("invalid power table for instance %d: %w", instance, err) + } + + // NOTE: we're intentionally keeping participants here even if they have no + // effective power (after rounding power) to simplify things. The runtime cost is + // minimal and it means that the keys can be aggregated before any rounding is done. + // TODO: this is slow and under a lock, but we only want to do it once per + // instance... ideally we'd have a per-instance lock/once, but that probably isn't + // worth it. + agg, err := h.verifier.Aggregate(table.Entries.PublicKeys()) + if err != nil { + return nil, fmt.Errorf("failed to pre-compute aggregate mask for instance %d: %w", instance, err) + } + + return &gpbft.Committee{ + PowerTable: table, + Beacon: ts.Beacon(), + AggregateVerifier: agg, + }, nil +} diff --git a/host.go b/host.go index 995e48ac..e6430dd4 100644 --- a/host.go +++ b/host.go @@ -19,7 +19,6 @@ import ( "github.com/filecoin-project/go-f3/manifest" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" - "go.opentelemetry.io/otel/metric" "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -51,6 +50,8 @@ type gpbftRunner struct { // msgsMutex guards access to selfMessages msgsMutex sync.Mutex selfMessages map[uint64]map[roundPhase][]*gpbft.GMessage + + inputs gpbftInputs } type roundPhase struct { @@ -77,7 +78,7 @@ func newRunner( manifest: m, ec: ec, pubsub: ps, - clock: clock.GetClock(runningCtx), + clock: clock.GetClock(ctx), verifier: verifier, wal: wal, outMessages: out, @@ -86,6 +87,13 @@ func newRunner( ctxCancel: ctxCancel, equivFilter: newEquivocationFilter(pID), selfMessages: make(map[uint64]map[roundPhase][]*gpbft.GMessage), + inputs: gpbftInputs{ + manifest: m, + certStore: cs, + ec: ec, + verifier: verifier, + clock: clock.GetClock(ctx), + }, } // create a stopped timer to facilitate alerts requested from gpbft @@ -620,28 +628,12 @@ func (h *gpbftHost) RequestRebroadcast(instant gpbft.Instant) error { return err } -func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, error) { - // TODO: optimize when head is way beyond base - res := make([]ec.TipSet, 0, 2*gpbft.ChainMaxLen) - res = append(res, head) - - current := head - for !bytes.Equal(current.Key(), base.Key()) { - if current.Epoch() < base.Epoch() { - metrics.headDiverged.Add(h.runningCtx, 1) - log.Infow("reorg-ed away from base, proposing just base", - "head", head.String(), "base", base.String()) - return nil, nil - } - var err error - current, err = h.ec.GetParent(h.runningCtx, current) - if err != nil { - return nil, fmt.Errorf("walking back the chain: %w", err) - } - res = append(res, current) - } - slices.Reverse(res) - return res[1:], nil +func (h *gpbftHost) GetProposal(instance uint64) (*gpbft.SupplementalData, gpbft.ECChain, error) { + return h.inputs.GetProposal(h.runningCtx, instance) +} + +func (h *gpbftHost) GetCommittee(instance uint64) (*gpbft.Committee, error) { + return h.inputs.GetCommittee(h.runningCtx, instance) } func (h *gpbftRunner) Stop(context.Context) error { @@ -661,181 +653,6 @@ func (h *gpbftRunner) Progress() gpbft.Instant { return h.participant.Progress() } -// Returns inputs to the next GPBFT instance. -// These are: -// - the supplemental data. -// - the EC chain to propose. -// These will be used as input to a subsequent instance of the protocol. -// The chain should be a suffix of the last chain notified to the host via -// ReceiveDecision (or known to be final via some other channel). -func (h *gpbftHost) GetProposal(instance uint64) (_ *gpbft.SupplementalData, _ gpbft.ECChain, _err error) { - defer func(start time.Time) { - metrics.proposalFetchTime.Record(context.TODO(), time.Since(start).Seconds(), metric.WithAttributes(attrStatusFromErr(_err))) - }(time.Now()) - - var baseTsk gpbft.TipSetKey - if instance == h.manifest.InitialInstance { - ts, err := h.ec.GetTipsetByEpoch(h.runningCtx, - h.manifest.BootstrapEpoch-h.manifest.EC.Finality) - if err != nil { - return nil, nil, fmt.Errorf("getting boostrap base: %w", err) - } - baseTsk = ts.Key() - } else { - cert, err := h.certStore.Get(h.runningCtx, instance-1) - if err != nil { - return nil, nil, fmt.Errorf("getting cert for previous instance(%d): %w", instance-1, err) - } - baseTsk = cert.ECChain.Head().Key - } - - baseTs, err := h.ec.GetTipset(h.runningCtx, baseTsk) - if err != nil { - return nil, nil, fmt.Errorf("getting base TS: %w", err) - } - headTs, err := h.ec.GetHead(h.runningCtx) - if err != nil { - return nil, nil, fmt.Errorf("getting head TS: %w", err) - } - - collectedChain, err := h.collectChain(baseTs, headTs) - if err != nil { - return nil, nil, fmt.Errorf("collecting chain: %w", err) - } - - // If we have an explicit head-lookback, trim the chain. - if h.manifest.EC.HeadLookback > 0 { - collectedChain = collectedChain[:max(0, len(collectedChain)-h.manifest.EC.HeadLookback)] - } - - // less than ECPeriod since production of the head agreement is unlikely, trim the chain. - if len(collectedChain) > 0 && h.clock.Since(collectedChain[len(collectedChain)-1].Timestamp()) < h.manifest.EC.Period { - collectedChain = collectedChain[:len(collectedChain)-1] - } - - base := gpbft.TipSet{ - Epoch: baseTs.Epoch(), - Key: baseTs.Key(), - } - pte, err := h.ec.GetPowerTable(h.runningCtx, baseTs.Key()) - if err != nil { - return nil, nil, fmt.Errorf("getting power table for base: %w", err) - } - base.PowerTable, err = certs.MakePowerTableCID(pte) - if err != nil { - return nil, nil, fmt.Errorf("computing powertable CID for base: %w", err) - } - - suffix := make([]gpbft.TipSet, min(gpbft.ChainMaxLen-1, len(collectedChain))) // -1 because of base - for i := range suffix { - suffix[i].Key = collectedChain[i].Key() - suffix[i].Epoch = collectedChain[i].Epoch() - - pte, err = h.ec.GetPowerTable(h.runningCtx, suffix[i].Key) - if err != nil { - return nil, nil, fmt.Errorf("getting power table for suffix %d: %w", i, err) - } - suffix[i].PowerTable, err = certs.MakePowerTableCID(pte) - if err != nil { - return nil, nil, fmt.Errorf("computing powertable CID for base: %w", err) - } - } - chain, err := gpbft.NewChain(base, suffix...) - if err != nil { - return nil, nil, fmt.Errorf("making new chain: %w", err) - } - - var supplData gpbft.SupplementalData - committee, err := h.GetCommittee(instance + 1) - if err != nil { - return nil, nil, fmt.Errorf("getting commite for %d: %w", instance+1, err) - } - - supplData.PowerTable, err = certs.MakePowerTableCID(committee.PowerTable.Entries) - if err != nil { - return nil, nil, fmt.Errorf("making power table cid for supplemental data: %w", err) - } - - return &supplData, chain, nil -} - -func (h *gpbftHost) GetCommittee(instance uint64) (_ *gpbft.Committee, _err error) { - defer func(start time.Time) { - metrics.committeeFetchTime.Record(context.TODO(), time.Since(start).Seconds(), metric.WithAttributes(attrStatusFromErr(_err))) - }(time.Now()) - - var powerTsk gpbft.TipSetKey - var powerEntries gpbft.PowerEntries - var err error - - if instance < h.manifest.InitialInstance+h.manifest.CommitteeLookback { - //boostrap phase - powerEntries, err = h.certStore.GetPowerTable(h.runningCtx, h.manifest.InitialInstance) - if err != nil { - return nil, fmt.Errorf("getting power table: %w", err) - } - if h.certStore.Latest() == nil { - ts, err := h.ec.GetTipsetByEpoch(h.runningCtx, h.manifest.BootstrapEpoch-h.manifest.EC.Finality) - if err != nil { - return nil, fmt.Errorf("getting tipset for boostrap epoch with lookback: %w", err) - } - powerTsk = ts.Key() - } else { - cert, err := h.certStore.Get(h.runningCtx, h.manifest.InitialInstance) - if err != nil { - return nil, fmt.Errorf("getting finality certificate: %w", err) - } - powerTsk = cert.ECChain.Base().Key - } - } else { - cert, err := h.certStore.Get(h.runningCtx, instance-h.manifest.CommitteeLookback) - if err != nil { - return nil, fmt.Errorf("getting finality certificate: %w", err) - } - powerTsk = cert.ECChain.Head().Key - - powerEntries, err = h.certStore.GetPowerTable(h.runningCtx, instance) - if err != nil { - log.Debugf("failed getting power table from certstore: %v, falling back to EC", err) - - powerEntries, err = h.ec.GetPowerTable(h.runningCtx, powerTsk) - if err != nil { - return nil, fmt.Errorf("getting power table: %w", err) - } - } - } - - ts, err := h.ec.GetTipset(h.runningCtx, powerTsk) - if err != nil { - return nil, fmt.Errorf("getting tipset: %w", err) - } - - table := gpbft.NewPowerTable() - if err := table.Add(powerEntries...); err != nil { - return nil, fmt.Errorf("adding entries to power table: %w", err) - } - if err := table.Validate(); err != nil { - return nil, fmt.Errorf("invalid power table for instance %d: %w", instance, err) - } - - // NOTE: we're intentionally keeping participants here even if they have no - // effective power (after rounding power) to simplify things. The runtime cost is - // minimal and it means that the keys can be aggregated before any rounding is done. - // TODO: this is slow and under a lock, but we only want to do it once per - // instance... ideally we'd have a per-instance lock/once, but that probably isn't - // worth it. - agg, err := h.Aggregate(table.Entries.PublicKeys()) - if err != nil { - return nil, fmt.Errorf("failed to pre-compute aggregate mask for instance %d: %w", instance, err) - } - - return &gpbft.Committee{ - PowerTable: table, - Beacon: ts.Beacon(), - AggregateVerifier: agg, - }, nil -} - // Returns the network's name (for signature separation) func (h *gpbftHost) NetworkName() gpbft.NetworkName { return h.manifest.NetworkName