Skip to content

Commit

Permalink
feat: retrieves timeout propose and timeout commit dynamically per he…
Browse files Browse the repository at this point in the history
…ight according to the app version (#1494)

The celestia-core portion of
[#3859](celestiaorg/celestia-app#3859)

---------

Co-authored-by: evan-forbes <[email protected]>
  • Loading branch information
staheri14 and evan-forbes authored Oct 15, 2024
1 parent a263c2c commit c2c6463
Show file tree
Hide file tree
Showing 27 changed files with 1,028 additions and 406 deletions.
920 changes: 653 additions & 267 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,7 @@ FOR_LOOP:
// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)

// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
// TODO: same thing for app - but we would need a way to get the hash without persisting the state
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first, second.LastCommit)
if err != nil {
// TODO This is bad, are we zombie?
Expand Down
42 changes: 36 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,14 @@ type BaseConfig struct { //nolint: maligned
chainID string

// The root directory for all data.
// This should be set in viper so it can unmarshal into this struct
// This should be set in viper so that it can unmarshal into this struct
RootDir string `mapstructure:"home"`

// TCP or UNIX socket address of the ABCI application,
// or the name of an ABCI application compiled in with the CometBFT binary
ProxyApp string `mapstructure:"proxy_app"`

// A custom human readable name for this node
// A custom human-readable name for this node
Moniker string `mapstructure:"moniker"`

// If this node is many blocks behind the tip of the chain, FastSync
Expand Down Expand Up @@ -212,7 +212,7 @@ type BaseConfig struct { //nolint: maligned
// Output format: 'plain' (colored text) or 'json'
LogFormat string `mapstructure:"log_format"`

// Path to the JSON file containing the initial validator set and other meta data
// Path to the JSON file containing the initial validator set and other metadata
Genesis string `mapstructure:"genesis_file"`

// Path to the JSON file containing the private key to use as a validator in the consensus protocol
Expand Down Expand Up @@ -279,7 +279,7 @@ func (cfg BaseConfig) PrivValidatorKeyFile() string {
return rootify(cfg.PrivValidatorKey, cfg.RootDir)
}

// PrivValidatorFile returns the full path to the priv_validator_state.json file
// PrivValidatorStateFile returns the full path to the priv_validator_state.json file
func (cfg BaseConfig) PrivValidatorStateFile() string {
return rootify(cfg.PrivValidatorState, cfg.RootDir)
}
Expand Down Expand Up @@ -871,7 +871,7 @@ func DefaultStateSyncConfig() *StateSyncConfig {
}
}

// TestFastSyncConfig returns a default configuration for the state sync service
// TestStateSyncConfig returns a default configuration for the state sync service
func TestStateSyncConfig() *StateSyncConfig {
return DefaultStateSyncConfig()
}
Expand Down Expand Up @@ -1055,6 +1055,20 @@ func (cfg *ConsensusConfig) Propose(round int32) time.Duration {
) * time.Nanosecond
}

// ProposeWithCustomTimeout is identical to Propose. However,
// it calculates the amount of time to wait for a proposal using the supplied
// customTimeout.
// If customTimeout is 0, the TimeoutPropose from cfg is used.
func (cfg *ConsensusConfig) ProposeWithCustomTimeout(round int32, customTimeout time.Duration) time.Duration {
// this is to capture any unforeseen cases where the customTimeout is 0
var timeoutPropose = customTimeout
if timeoutPropose == 0 {
// falling back to default timeout
timeoutPropose = cfg.TimeoutPropose
}
return time.Duration(timeoutPropose.Nanoseconds()+cfg.TimeoutProposeDelta.Nanoseconds()*int64(round)) * time.Nanosecond
}

// Prevote returns the amount of time to wait for straggler votes after receiving any +2/3 prevotes
func (cfg *ConsensusConfig) Prevote(round int32) time.Duration {
return time.Duration(
Expand All @@ -1070,11 +1084,23 @@ func (cfg *ConsensusConfig) Precommit(round int32) time.Duration {
}

// Commit returns the amount of time to wait for straggler votes after receiving +2/3 precommits
// for a single block (ie. a commit).
// for a single block (i.e., a commit).
func (cfg *ConsensusConfig) Commit(t time.Time) time.Time {
return t.Add(cfg.TimeoutCommit)
}

// CommitWithCustomTimeout is identical to Commit. However, it calculates the time for commit using the supplied customTimeout.
// If customTimeout is 0, the TimeoutCommit from cfg is used.
func (cfg *ConsensusConfig) CommitWithCustomTimeout(t time.Time, customTimeout time.Duration) time.Time {
// this is to capture any unforeseen cases where the customTimeout is 0
var timeoutCommit = customTimeout
if timeoutCommit == 0 {
// falling back to default timeout
timeoutCommit = cfg.TimeoutCommit
}
return t.Add(timeoutCommit)
}

// WalFile returns the full path to the write-ahead log file
func (cfg *ConsensusConfig) WalFile() string {
if cfg.walFile != "" {
Expand All @@ -1091,6 +1117,8 @@ func (cfg *ConsensusConfig) SetWalFile(walFile string) {
// ValidateBasic performs basic validation (checking param bounds, etc.) and
// returns an error if any check fails.
func (cfg *ConsensusConfig) ValidateBasic() error {
// TODO we may want to remove this check if TimeoutPropose is removed from
// the config
if cfg.TimeoutPropose < 0 {
return errors.New("timeout_propose can't be negative")
}
Expand All @@ -1109,6 +1137,8 @@ func (cfg *ConsensusConfig) ValidateBasic() error {
if cfg.TimeoutPrecommitDelta < 0 {
return errors.New("timeout_precommit_delta can't be negative")
}
// TODO we may want to remove this check if TimeoutCommit is removed from
// the config
if cfg.TimeoutCommit < 0 {
return errors.New("timeout_commit can't be negative")
}
Expand Down
28 changes: 28 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,31 @@ func TestInstrumentationConfigValidateBasic(t *testing.T) {
cfg.MaxOpenConnections = -1
assert.Error(t, cfg.ValidateBasic())
}

func TestProposeWithCustomTimeout(t *testing.T) {
cfg := DefaultConsensusConfig()

// customTimeout is 0, should fallback to default timeout
round := int32(1)
expectedTimeout := time.Duration(cfg.TimeoutPropose.Nanoseconds()+cfg.TimeoutProposeDelta.Nanoseconds()*int64(round)) * time.Nanosecond
assert.Equal(t, expectedTimeout, cfg.ProposeWithCustomTimeout(round, time.Duration(0)))

// customTimeout is not 0
customTimeout := 2 * time.Second
expectedTimeout = time.Duration(customTimeout.Nanoseconds()+cfg.TimeoutProposeDelta.Nanoseconds()*int64(round)) * time.Nanosecond
assert.Equal(t, expectedTimeout, cfg.ProposeWithCustomTimeout(round, customTimeout))
}

func TestCommitWithCustomTimeout(t *testing.T) {
cfg := DefaultConsensusConfig()

// customTimeout is 0, should fallback to default timeout
inputTime := time.Now()
expectedTime := inputTime.Add(cfg.TimeoutCommit)
assert.Equal(t, expectedTime, cfg.CommitWithCustomTimeout(inputTime, time.Duration(0)))

// customTimeout is not 0
customTimeout := 2 * time.Second
expectedTime = inputTime.Add(customTimeout)
assert.Equal(t, expectedTime, cfg.CommitWithCustomTimeout(inputTime, customTimeout))
}
8 changes: 6 additions & 2 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
timeoutCh := subscribe(cs.eventBus, types.EventQueryTimeoutPropose)
cs.setProposal = func(proposal *types.Proposal) error {
if cs.Height == 2 && cs.Round == 0 {
// dont set the proposal in round 0 so we timeout and
// don't set the proposal in round 0 so we timeout and
// go to next round
cs.Logger.Info("Ignoring set proposal at height 2, round 0")
return nil
Expand All @@ -91,7 +91,11 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
round = 0

ensureNewRound(newRoundCh, height, round) // first round at next height
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
deliverTxsRange(cs, 0, 1) // we deliver txs, but don't set a proposal so we get the next round
// The use of cs.config.TimeoutPropose.Nanoseconds() as the timeout propose is acceptable in this test case, the following line.
// Even though timeouts are version-dependent, cs is created with an empty previous state in this scenario.
// As there's no timeout propose in the previous state, we default to the timeout propose in the config.
// This makes the test case valid.
ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds())

round++ // moving to the next round
Expand Down
9 changes: 7 additions & 2 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ LOOP:
}

// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
// it will attempt to e.g., double sign, but we can just ignore it
// since the votes will be replayed, and we'll get to the next step
if err := cs.readReplayMessage(msg, nil); err != nil {
return err
}
Expand Down Expand Up @@ -362,6 +362,11 @@ func (h *Handshaker) ReplayBlocksWithContext(
state.ConsensusParams = types.UpdateConsensusParams(state.ConsensusParams, res.ConsensusParams)
state.Version.Consensus.App = state.ConsensusParams.Version.AppVersion
}

// update timeouts based on the InitChainSync response
state.TimeoutCommit = res.Timeouts.TimeoutCommit
state.TimeoutPropose = res.Timeouts.TimeoutPropose

// We update the last results hash with the empty hash, to conform with RFC-6962.
state.LastResultsHash = merkle.HashFromByteSlices(nil)
if err := h.stateStore.Save(state); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (
//--------------------------------------------------------
// replay messages interactively or all at once

// replay the wal file
// RunReplayFile replays the wal file
func RunReplayFile(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig, console bool) {
consensusState := newConsensusStateForReplay(config, csConfig)

Expand All @@ -38,7 +38,7 @@ func RunReplayFile(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig, console
}
}

// Replay msgs in file or start the console
// ReplayFile replays msgs in file or start the console
func (cs *State) ReplayFile(file string, console bool) error {

if cs.IsRunning() {
Expand Down
24 changes: 18 additions & 6 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,9 +674,21 @@ func (cs *State) updateToState(state sm.State) {
// to be gathered for the first block.
// And alternative solution that relies on clocks:
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
cs.StartTime = cs.config.Commit(cmttime.Now())

if state.LastBlockHeight == 0 {
// Don't use cs.state.TimeoutCommit because that is zero
cs.StartTime = cs.config.CommitWithCustomTimeout(cmttime.Now(), state.TimeoutCommit)
} else {
cs.StartTime = cs.config.CommitWithCustomTimeout(cmttime.Now(), cs.state.TimeoutCommit)
}

} else {
cs.StartTime = cs.config.Commit(cs.CommitTime)
if state.LastBlockHeight == 0 {
cs.StartTime = cs.config.CommitWithCustomTimeout(cs.CommitTime, state.TimeoutCommit)
} else {
cs.StartTime = cs.config.CommitWithCustomTimeout(cs.CommitTime, cs.state.TimeoutCommit)
}

}

cs.Validators = validators
Expand Down Expand Up @@ -1113,7 +1125,7 @@ func (cs *State) enterPropose(height int64, round int32) {
}()

// If we don't get the proposal and all block parts quick enough, enterPrevote
cs.scheduleTimeout(cs.config.Propose(round), height, round, cstypes.RoundStepPropose)
cs.scheduleTimeout(cs.config.ProposeWithCustomTimeout(round, cs.state.TimeoutPropose), height, round, cstypes.RoundStepPropose)

// Nothing more to do if we're not a validator
if cs.privValidator == nil {
Expand Down Expand Up @@ -1682,7 +1694,7 @@ func (cs *State) finalizeCommit(height int64) {
// exists.
//
// Either way, the State should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after
// successfully call ApplyBlock (i.e., later here, or in Handshake after
// restart).
endMsg := EndHeightMessage{height}
if err := cs.wal.WriteSync(endMsg); err != nil { // NOTE: fsync
Expand All @@ -1698,7 +1710,7 @@ func (cs *State) finalizeCommit(height int64) {
stateCopy := cs.state.Copy()

// Execute and commit the block, update and save the state, and update the mempool.
// NOTE The block.AppHash wont reflect these txs until the next block.
// NOTE The block.AppHash won't reflect these txs until the next block.
var (
err error
retainHeight int64
Expand Down Expand Up @@ -1741,7 +1753,7 @@ func (cs *State) finalizeCommit(height int64) {

fail.Fail() // XXX

// Private validator might have changed it's key pair => refetch pubkey.
// Private validator might have changed its key pair => refetch pubkey.
if err := cs.updatePrivValidatorPubKey(); err != nil {
logger.Error("failed to get private validator pubkey", "err", err)
}
Expand Down
24 changes: 16 additions & 8 deletions consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func TestStateEnterProposeNoPrivValidator(t *testing.T) {
startTestRound(cs, height, round)

// if we're not a validator, EnterPropose should timeout
// The use of cs.config.TimeoutPropose.Nanoseconds() as the timeout propose is acceptable in this test case.
// Even though timeouts are version-dependent, cs is created with an empty previous state in this scenario.
// As there's no timeout propose in the previous state, we default to the timeout propose in the config.
// This makes the test case valid.
ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds())

if cs.GetRoundState().Proposal != nil {
Expand Down Expand Up @@ -179,6 +183,10 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) {
}

// if we're a validator, enterPropose should not timeout
// The use of cs.config.TimeoutPropose.Nanoseconds() as the timeout propose is acceptable in this test case.
// Even though timeouts are version-dependent, cs is created with an empty previous state in this scenario.
// As there's no timeout propose in the previous state, we default to the timeout propose in the config.
// This makes the test case valid.
ensureNoNewTimeout(timeoutCh, cs.config.TimeoutPropose.Nanoseconds())
}

Expand Down Expand Up @@ -310,7 +318,7 @@ func TestStateOversizedBlock(t *testing.T) {
lockedRound = -1
// if the block is oversized cs1 should log an error with the block part message as it exceeds
// the consensus params. The block is not added to cs.ProposalBlock so the node timeouts.
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.Propose(round).Nanoseconds())
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.ProposeWithCustomTimeout(round, cs1.state.TimeoutPropose).Nanoseconds())
// and then should send nil prevote and precommit regardless of whether other validators prevote and
// precommit on it
}
Expand Down Expand Up @@ -497,7 +505,7 @@ func TestStateLockNoPOL(t *testing.T) {
incrementRound(vs2)

// now we're on a new round and not the proposer, so wait for timeout
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.Propose(round).Nanoseconds())
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.ProposeWithCustomTimeout(round, cs1.state.TimeoutPropose).Nanoseconds())

rs := cs1.GetRoundState()

Expand Down Expand Up @@ -1029,7 +1037,7 @@ func TestStateLockPOLSafety1(t *testing.T) {
*/

// timeout of propose
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.Propose(round).Nanoseconds())
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.ProposeWithCustomTimeout(round, cs1.state.TimeoutPropose).Nanoseconds())

// finish prevote
ensurePrevote(voteCh, height, round)
Expand Down Expand Up @@ -1199,7 +1207,7 @@ func TestProposeValidBlock(t *testing.T) {
t.Log("### ONTO ROUND 2")

// timeout of propose
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.Propose(round).Nanoseconds())
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.ProposeWithCustomTimeout(round, cs1.state.TimeoutPropose).Nanoseconds())

ensurePrevote(voteCh, height, round)
validatePrevote(t, cs1, round, vss[0], propBlockHash)
Expand Down Expand Up @@ -1326,7 +1334,7 @@ func TestSetValidBlockOnDelayedProposal(t *testing.T) {
startTestRound(cs1, cs1.Height, round)
ensureNewRound(newRoundCh, height, round)

ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.Propose(round).Nanoseconds())
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.ProposeWithCustomTimeout(round, cs1.state.TimeoutPropose).Nanoseconds())

ensurePrevote(voteCh, height, round)
validatePrevote(t, cs1, round, vss[0], nil)
Expand Down Expand Up @@ -1407,7 +1415,7 @@ func TestWaitingTimeoutProposeOnNewRound(t *testing.T) {
rs := cs1.GetRoundState()
assert.True(t, rs.Step == cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires

ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.Propose(round).Nanoseconds())
ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.ProposeWithCustomTimeout(round, cs1.state.TimeoutPropose).Nanoseconds())

ensurePrevote(voteCh, height, round)
validatePrevote(t, cs1, round, vss[0], nil)
Expand Down Expand Up @@ -1471,7 +1479,7 @@ func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) {
incrementRound(vss[1:]...)
signAddVotes(cs1, cmtproto.PrevoteType, nil, types.PartSetHeader{}, vs2, vs3, vs4)

ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.Propose(round).Nanoseconds())
ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.ProposeWithCustomTimeout(round, cs1.state.TimeoutPropose).Nanoseconds())

ensurePrevote(voteCh, height, round)
validatePrevote(t, cs1, round, vss[0], nil)
Expand Down Expand Up @@ -1619,7 +1627,7 @@ func TestStartNextHeightCorrectlyAfterTimeout(t *testing.T) {

cs1.txNotifier.(*fakeTxNotifier).Notify()

ensureNewTimeout(timeoutProposeCh, height+1, round, cs1.config.Propose(round).Nanoseconds())
ensureNewTimeout(timeoutProposeCh, height+1, round, cs1.config.ProposeWithCustomTimeout(round, cs1.state.TimeoutPropose).Nanoseconds())
rs = cs1.GetRoundState()
assert.False(
t,
Expand Down
Loading

0 comments on commit c2c6463

Please sign in to comment.