Skip to content

Commit

Permalink
Merge pull request #21 from iotaledger/fix/shutdown
Browse files Browse the repository at this point in the history
Fix shutdown deadlock
  • Loading branch information
muXxer authored Dec 18, 2023
2 parents b0bdefd + 53ed22e commit d10a77b
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 24 deletions.
2 changes: 1 addition & 1 deletion components/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var (
Name = "inx-validator"

// Version of the app.
Version = "1.0.0-alpha.10"
Version = "1.0.0-alpha.11"
)

func App() *app.App {
Expand Down
5 changes: 3 additions & 2 deletions components/validator/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,15 @@ func run() error {

<-ctx.Done()

executor.Shutdown()
// we can cancel the pending elements here, because we don't want to issue any more blocks
executor.Shutdown(timed.CancelPendingElements)

Component.LogInfo("Stopping Validator... done")
}, daemon.PriorityStopValidator)
}

func checkValidatorStatus(ctx context.Context) {
isAccountValidator, err := deps.NodeBridge.ReadIsValidatorAccount(ctx, validatorAccount.ID(), deps.NodeBridge.NodeStatus().LatestCommitment.CommitmentId.Unwrap().Slot())
isAccountValidator, err := readIsValidatorAccount(ctx, validatorAccount.ID(), deps.NodeBridge.NodeStatus().LatestCommitment.CommitmentId.Unwrap().Slot())
if err != nil {
Component.LogErrorf("error when retrieving Validator account %s: %w", validatorAccount.ID(), err)

Expand Down
49 changes: 28 additions & 21 deletions components/validator/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ func candidateAction(ctx context.Context) {
currentAPI := deps.NodeBridge.APIProvider().APIForTime(now)
currentSlot := currentAPI.TimeProvider().SlotFromTime(now)

isCandidate, err := deps.NodeBridge.ReadIsCandidate(ctx, validatorAccount.ID(), currentSlot)
isCandidate, err := readIsCandidate(ctx, validatorAccount.ID(), currentSlot)
if err != nil {
Component.LogWarnf("error while checking if account is already a candidate: %s", err.Error())
// If there is an error, then retry registering as a candidate.
executor.ExecuteAt(CandidateTask, func() { candidateAction(ctx) }, now.Add(ParamsValidator.CandidacyRetryInterval))

// If there is an error, then retry registering as a candidate, except if the context was canceled.
if !ierrors.Is(err, context.Canceled) {
executor.ExecuteAt(CandidateTask, func() { candidateAction(ctx) }, now.Add(ParamsValidator.CandidacyRetryInterval))
}

return
}
Expand Down Expand Up @@ -80,10 +83,14 @@ func committeeMemberAction(ctx context.Context) {

// If we are bootstrapped let's check if we are part of the committee.
if deps.NodeBridge.NodeStatus().GetIsBootstrapped() {
isCommitteeMember, err := deps.NodeBridge.ReadIsCommitteeMember(ctx, validatorAccount.ID(), currentSlot)
isCommitteeMember, err := readIsCommitteeMember(ctx, validatorAccount.ID(), currentSlot)
if err != nil {
Component.LogWarnf("error while checking if account %s is a committee member in slot %d: %s", validatorAccount.ID(), currentSlot, err.Error())
executor.ExecuteAt(CommitteeTask, func() { committeeMemberAction(ctx) }, now.Add(committeeBroadcastInterval))

// If there is an error, then retry, except if the context was canceled.
if !ierrors.Is(err, context.Canceled) {
executor.ExecuteAt(CommitteeTask, func() { committeeMemberAction(ctx) }, now.Add(committeeBroadcastInterval))
}

return
}
Expand Down Expand Up @@ -117,7 +124,7 @@ func committeeMemberAction(ctx context.Context) {
func issueCandidateBlock(ctx context.Context, blockIssuingTime time.Time, currentAPI iotago.API) error {
blockSlot := currentAPI.TimeProvider().SlotFromTime(blockIssuingTime)

strongParents, weakParents, shallowLikeParents, err := deps.NodeBridge.RequestTips(ctx, iotago.BasicBlockMaxParents)
strongParents, weakParents, shallowLikeParents, err := requestTips(ctx, iotago.BasicBlockMaxParents)
if err != nil {
return ierrors.Wrapf(err, "failed to get tips")
}
Expand All @@ -142,7 +149,7 @@ func issueCandidateBlock(ctx context.Context, blockIssuingTime time.Time, curren
return ierrors.Wrap(err, "error creating block")
}

blockID, err := deps.NodeBridge.SubmitBlock(ctx, candidacyBlock)
blockID, err := submitBlock(ctx, candidacyBlock)
if err != nil {
return ierrors.Wrap(err, "error issuing candidacy announcement block")
}
Expand All @@ -158,7 +165,7 @@ func issueValidationBlock(ctx context.Context, blockIssuingTime time.Time, curre
return ierrors.Wrapf(err, "failed to get protocol parameters hash")
}

strongParents, weakParents, shallowLikeParents, err := deps.NodeBridge.RequestTips(ctx, iotago.ValidationBlockMaxParents)
strongParents, weakParents, shallowLikeParents, err := requestTips(ctx, iotago.ValidationBlockMaxParents)
if err != nil {
return ierrors.Wrapf(err, "failed to get tips")
}
Expand Down Expand Up @@ -198,7 +205,7 @@ func issueValidationBlock(ctx context.Context, blockIssuingTime time.Time, curre
return ierrors.Wrapf(err, "error creating validation block")
}

blockID, err := deps.NodeBridge.SubmitBlock(ctx, validationBlock)
blockID, err := submitBlock(ctx, validationBlock)
if err != nil {
return ierrors.Wrapf(err, "error issuing validation block")
}
Expand All @@ -212,7 +219,7 @@ func reviveChain(ctx context.Context, issuingTime time.Time) (*iotago.Commitment
lastCommittedSlot := deps.NodeBridge.NodeStatus().LatestCommitment.CommitmentId.Unwrap().Slot()
apiForSlot := deps.NodeBridge.APIProvider().APIForSlot(lastCommittedSlot)

activeRootBlocks, err := deps.NodeBridge.ActiveRootBlocks(ctx)
activeRootBlocks, err := activeRootBlocks(ctx)
if err != nil {
return nil, iotago.EmptyBlockID, ierrors.Wrapf(err, "failed to retrieve active root blocks")
}
Expand All @@ -239,11 +246,11 @@ func reviveChain(ctx context.Context, issuingTime time.Time) (*iotago.Commitment
}
commitUntilSlot := issuingSlot - apiForSlot.ProtocolParameters().MinCommittableAge()

if err = deps.NodeBridge.ForceCommitUntil(ctx, commitUntilSlot); err != nil {
if err = forceCommitUntil(ctx, commitUntilSlot); err != nil {
return nil, iotago.EmptyBlockID, ierrors.Wrapf(err, "failed to force commit until slot %d", commitUntilSlot)
}

commitment, err := deps.NodeBridge.Commitment(ctx, commitUntilSlot)
commitment, err := commitment(ctx, commitUntilSlot)
if err != nil {
return nil, iotago.EmptyBlockID, ierrors.Wrapf(err, "failed to commit until slot %d to revive chain", commitUntilSlot)
}
Expand All @@ -253,25 +260,25 @@ func reviveChain(ctx context.Context, issuingTime time.Time) (*iotago.Commitment

func getAddressableCommitment(ctx context.Context, blockSlot iotago.SlotIndex) (*iotago.Commitment, error) {
protoParams := deps.NodeBridge.APIProvider().APIForSlot(blockSlot).ProtocolParameters()
commitment := deps.NodeBridge.LatestCommitment().Commitment
latestCommitment := deps.NodeBridge.LatestCommitment().Commitment

if blockSlot > commitment.Slot+protoParams.MaxCommittableAge() {
return nil, ierrors.Wrapf(ErrBlockTooRecent, "can't issue block: block slot %d is too far in the future, latest commitment is %d", blockSlot, commitment.Slot)
if blockSlot > latestCommitment.Slot+protoParams.MaxCommittableAge() {
return nil, ierrors.Wrapf(ErrBlockTooRecent, "can't issue block: block slot %d is too far in the future, latest commitment is %d", blockSlot, latestCommitment.Slot)
}

if blockSlot < commitment.Slot+protoParams.MinCommittableAge() {
if blockSlot < protoParams.GenesisSlot()+protoParams.MinCommittableAge() || commitment.Slot < protoParams.GenesisSlot()+protoParams.MinCommittableAge() {
return commitment, nil
if blockSlot < latestCommitment.Slot+protoParams.MinCommittableAge() {
if blockSlot < protoParams.GenesisSlot()+protoParams.MinCommittableAge() || latestCommitment.Slot < protoParams.GenesisSlot()+protoParams.MinCommittableAge() {
return latestCommitment, nil
}

commitmentSlot := commitment.Slot - protoParams.MinCommittableAge()
loadedCommitment, err := deps.NodeBridge.Commitment(ctx, commitmentSlot)
commitmentSlot := latestCommitment.Slot - protoParams.MinCommittableAge()
loadedCommitment, err := commitment(ctx, commitmentSlot)
if err != nil {
return nil, ierrors.Wrapf(err, "error loading valid commitment of slot %d according to minCommittableAge", commitmentSlot)
}

return loadedCommitment.Commitment, nil
}

return commitment, nil
return latestCommitment, nil
}
84 changes: 84 additions & 0 deletions components/validator/nodebridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package validator

import (
"context"
"time"

"github.com/iotaledger/inx-app/pkg/nodebridge"
iotago "github.com/iotaledger/iota.go/v4"
)

const (
TimeoutINXReadIsCandidate = 1 * time.Second
TimeoutINXReadIsCommitteeMember = 1 * time.Second
TimeoutINXReadIsValidatorAccount = 1 * time.Second
TimeoutINXRequestTips = 2 * time.Second
TimeoutINXSubmitBlock = 2 * time.Second
TimeoutINXActiveRootBlocks = 2 * time.Second
TimeoutINXForceCommitUntil = 5 * time.Second
TimeoutINXCommitment = 1 * time.Second
)

// readIsCandidate returns true if the given account is a candidate.
func readIsCandidate(ctx context.Context, accountID iotago.AccountID, slot iotago.SlotIndex) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXReadIsCandidate)
defer cancel()

return deps.NodeBridge.ReadIsCandidate(ctx, accountID, slot)
}

// readIsCommitteeMember returns true if the given account is a committee member.
func readIsCommitteeMember(ctx context.Context, accountID iotago.AccountID, slot iotago.SlotIndex) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXReadIsCommitteeMember)
defer cancel()

return deps.NodeBridge.ReadIsCommitteeMember(ctx, accountID, slot)
}

// readIsValidatorAccount returns true if the given account is a validator account.
func readIsValidatorAccount(ctx context.Context, accountID iotago.AccountID, slot iotago.SlotIndex) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXReadIsValidatorAccount)
defer cancel()

return deps.NodeBridge.ReadIsValidatorAccount(ctx, accountID, slot)
}

// requestTips requests tips from the node.
func requestTips(ctx context.Context, count uint32) (iotago.BlockIDs, iotago.BlockIDs, iotago.BlockIDs, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXRequestTips)
defer cancel()

return deps.NodeBridge.RequestTips(ctx, count)
}

// submitBlock submits a block to the node.
func submitBlock(ctx context.Context, block *iotago.Block) (iotago.BlockID, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXSubmitBlock)
defer cancel()

return deps.NodeBridge.SubmitBlock(ctx, block)
}

// activeRootBlocks returns the active root blocks.
func activeRootBlocks(ctx context.Context) (map[iotago.BlockID]iotago.CommitmentID, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXActiveRootBlocks)
defer cancel()

return deps.NodeBridge.ActiveRootBlocks(ctx)
}

// forceCommitUntil forces the node to commit until the given slot.
func forceCommitUntil(ctx context.Context, slot iotago.SlotIndex) error {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXForceCommitUntil)
defer cancel()

return deps.NodeBridge.ForceCommitUntil(ctx, slot)
}

// commitment returns the commitment for the given slot.
func commitment(ctx context.Context, slot iotago.SlotIndex) (*nodebridge.Commitment, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXCommitment)
defer cancel()

return deps.NodeBridge.Commitment(ctx, slot)
}

0 comments on commit d10a77b

Please sign in to comment.