Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shutdown deadlock #21

Merged
merged 4 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var (
Name = "inx-validator"

// Version of the app.
Version = "1.0.0-alpha.10"
Version = "1.0.0-alpha.11"
)

func App() *app.App {
Expand Down
5 changes: 3 additions & 2 deletions components/validator/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,15 @@ func run() error {

<-ctx.Done()

executor.Shutdown()
// we can cancel the pending elements here, because we don't want to issue any more blocks
executor.Shutdown(timed.CancelPendingElements)

Component.LogInfo("Stopping Validator... done")
}, daemon.PriorityStopValidator)
}

func checkValidatorStatus(ctx context.Context) {
isAccountValidator, err := deps.NodeBridge.ReadIsValidatorAccount(ctx, validatorAccount.ID(), deps.NodeBridge.NodeStatus().LatestCommitment.CommitmentId.Unwrap().Slot())
isAccountValidator, err := readIsValidatorAccount(ctx, validatorAccount.ID(), deps.NodeBridge.NodeStatus().LatestCommitment.CommitmentId.Unwrap().Slot())
if err != nil {
Component.LogErrorf("error when retrieving Validator account %s: %w", validatorAccount.ID(), err)

Expand Down
49 changes: 28 additions & 21 deletions components/validator/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ func candidateAction(ctx context.Context) {
currentAPI := deps.NodeBridge.APIProvider().APIForTime(now)
currentSlot := currentAPI.TimeProvider().SlotFromTime(now)

isCandidate, err := deps.NodeBridge.ReadIsCandidate(ctx, validatorAccount.ID(), currentSlot)
isCandidate, err := readIsCandidate(ctx, validatorAccount.ID(), currentSlot)
if err != nil {
Component.LogWarnf("error while checking if account is already a candidate: %s", err.Error())
// If there is an error, then retry registering as a candidate.
executor.ExecuteAt(CandidateTask, func() { candidateAction(ctx) }, now.Add(ParamsValidator.CandidacyRetryInterval))

// If there is an error, then retry registering as a candidate, except if the context was canceled.
if !ierrors.Is(err, context.Canceled) {
executor.ExecuteAt(CandidateTask, func() { candidateAction(ctx) }, now.Add(ParamsValidator.CandidacyRetryInterval))
}

return
}
Expand Down Expand Up @@ -80,10 +83,14 @@ func committeeMemberAction(ctx context.Context) {

// If we are bootstrapped let's check if we are part of the committee.
if deps.NodeBridge.NodeStatus().GetIsBootstrapped() {
isCommitteeMember, err := deps.NodeBridge.ReadIsCommitteeMember(ctx, validatorAccount.ID(), currentSlot)
isCommitteeMember, err := readIsCommitteeMember(ctx, validatorAccount.ID(), currentSlot)
if err != nil {
Component.LogWarnf("error while checking if account %s is a committee member in slot %d: %s", validatorAccount.ID(), currentSlot, err.Error())
executor.ExecuteAt(CommitteeTask, func() { committeeMemberAction(ctx) }, now.Add(committeeBroadcastInterval))

// If there is an error, then retry, except if the context was canceled.
if !ierrors.Is(err, context.Canceled) {
executor.ExecuteAt(CommitteeTask, func() { committeeMemberAction(ctx) }, now.Add(committeeBroadcastInterval))
}

return
}
Expand Down Expand Up @@ -117,7 +124,7 @@ func committeeMemberAction(ctx context.Context) {
func issueCandidateBlock(ctx context.Context, blockIssuingTime time.Time, currentAPI iotago.API) error {
blockSlot := currentAPI.TimeProvider().SlotFromTime(blockIssuingTime)

strongParents, weakParents, shallowLikeParents, err := deps.NodeBridge.RequestTips(ctx, iotago.BasicBlockMaxParents)
strongParents, weakParents, shallowLikeParents, err := requestTips(ctx, iotago.BasicBlockMaxParents)
if err != nil {
return ierrors.Wrapf(err, "failed to get tips")
}
Expand All @@ -142,7 +149,7 @@ func issueCandidateBlock(ctx context.Context, blockIssuingTime time.Time, curren
return ierrors.Wrap(err, "error creating block")
}

blockID, err := deps.NodeBridge.SubmitBlock(ctx, candidacyBlock)
blockID, err := submitBlock(ctx, candidacyBlock)
if err != nil {
return ierrors.Wrap(err, "error issuing candidacy announcement block")
}
Expand All @@ -158,7 +165,7 @@ func issueValidationBlock(ctx context.Context, blockIssuingTime time.Time, curre
return ierrors.Wrapf(err, "failed to get protocol parameters hash")
}

strongParents, weakParents, shallowLikeParents, err := deps.NodeBridge.RequestTips(ctx, iotago.ValidationBlockMaxParents)
strongParents, weakParents, shallowLikeParents, err := requestTips(ctx, iotago.ValidationBlockMaxParents)
if err != nil {
return ierrors.Wrapf(err, "failed to get tips")
}
Expand Down Expand Up @@ -198,7 +205,7 @@ func issueValidationBlock(ctx context.Context, blockIssuingTime time.Time, curre
return ierrors.Wrapf(err, "error creating validation block")
}

blockID, err := deps.NodeBridge.SubmitBlock(ctx, validationBlock)
blockID, err := submitBlock(ctx, validationBlock)
if err != nil {
return ierrors.Wrapf(err, "error issuing validation block")
}
Expand All @@ -212,7 +219,7 @@ func reviveChain(ctx context.Context, issuingTime time.Time) (*iotago.Commitment
lastCommittedSlot := deps.NodeBridge.NodeStatus().LatestCommitment.CommitmentId.Unwrap().Slot()
apiForSlot := deps.NodeBridge.APIProvider().APIForSlot(lastCommittedSlot)

activeRootBlocks, err := deps.NodeBridge.ActiveRootBlocks(ctx)
activeRootBlocks, err := activeRootBlocks(ctx)
if err != nil {
return nil, iotago.EmptyBlockID, ierrors.Wrapf(err, "failed to retrieve active root blocks")
}
Expand All @@ -239,11 +246,11 @@ func reviveChain(ctx context.Context, issuingTime time.Time) (*iotago.Commitment
}
commitUntilSlot := issuingSlot - apiForSlot.ProtocolParameters().MinCommittableAge()

if err = deps.NodeBridge.ForceCommitUntil(ctx, commitUntilSlot); err != nil {
if err = forceCommitUntil(ctx, commitUntilSlot); err != nil {
return nil, iotago.EmptyBlockID, ierrors.Wrapf(err, "failed to force commit until slot %d", commitUntilSlot)
}

commitment, err := deps.NodeBridge.Commitment(ctx, commitUntilSlot)
commitment, err := commitment(ctx, commitUntilSlot)
if err != nil {
return nil, iotago.EmptyBlockID, ierrors.Wrapf(err, "failed to commit until slot %d to revive chain", commitUntilSlot)
}
Expand All @@ -253,25 +260,25 @@ func reviveChain(ctx context.Context, issuingTime time.Time) (*iotago.Commitment

func getAddressableCommitment(ctx context.Context, blockSlot iotago.SlotIndex) (*iotago.Commitment, error) {
protoParams := deps.NodeBridge.APIProvider().APIForSlot(blockSlot).ProtocolParameters()
commitment := deps.NodeBridge.LatestCommitment().Commitment
latestCommitment := deps.NodeBridge.LatestCommitment().Commitment

if blockSlot > commitment.Slot+protoParams.MaxCommittableAge() {
return nil, ierrors.Wrapf(ErrBlockTooRecent, "can't issue block: block slot %d is too far in the future, latest commitment is %d", blockSlot, commitment.Slot)
if blockSlot > latestCommitment.Slot+protoParams.MaxCommittableAge() {
return nil, ierrors.Wrapf(ErrBlockTooRecent, "can't issue block: block slot %d is too far in the future, latest commitment is %d", blockSlot, latestCommitment.Slot)
}

if blockSlot < commitment.Slot+protoParams.MinCommittableAge() {
if blockSlot < protoParams.GenesisSlot()+protoParams.MinCommittableAge() || commitment.Slot < protoParams.GenesisSlot()+protoParams.MinCommittableAge() {
return commitment, nil
if blockSlot < latestCommitment.Slot+protoParams.MinCommittableAge() {
if blockSlot < protoParams.GenesisSlot()+protoParams.MinCommittableAge() || latestCommitment.Slot < protoParams.GenesisSlot()+protoParams.MinCommittableAge() {
return latestCommitment, nil
}

commitmentSlot := commitment.Slot - protoParams.MinCommittableAge()
loadedCommitment, err := deps.NodeBridge.Commitment(ctx, commitmentSlot)
commitmentSlot := latestCommitment.Slot - protoParams.MinCommittableAge()
loadedCommitment, err := commitment(ctx, commitmentSlot)
if err != nil {
return nil, ierrors.Wrapf(err, "error loading valid commitment of slot %d according to minCommittableAge", commitmentSlot)
}

return loadedCommitment.Commitment, nil
}

return commitment, nil
return latestCommitment, nil
}
84 changes: 84 additions & 0 deletions components/validator/nodebridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package validator

import (
"context"
"time"

"github.com/iotaledger/inx-app/pkg/nodebridge"
iotago "github.com/iotaledger/iota.go/v4"
)

const (
TimeoutINXReadIsCandidate = 1 * time.Second
TimeoutINXReadIsCommitteeMember = 1 * time.Second
TimeoutINXReadIsValidatorAccount = 1 * time.Second
TimeoutINXRequestTips = 2 * time.Second
TimeoutINXSubmitBlock = 2 * time.Second
TimeoutINXActiveRootBlocks = 2 * time.Second
TimeoutINXForceCommitUntil = 5 * time.Second
TimeoutINXCommitment = 1 * time.Second
)

// readIsCandidate returns true if the given account is a candidate.
func readIsCandidate(ctx context.Context, accountID iotago.AccountID, slot iotago.SlotIndex) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXReadIsCandidate)
defer cancel()

return deps.NodeBridge.ReadIsCandidate(ctx, accountID, slot)
}

// readIsCommitteeMember returns true if the given account is a committee member.
func readIsCommitteeMember(ctx context.Context, accountID iotago.AccountID, slot iotago.SlotIndex) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXReadIsCommitteeMember)
defer cancel()

return deps.NodeBridge.ReadIsCommitteeMember(ctx, accountID, slot)
}

// readIsValidatorAccount returns true if the given account is a validator account.
func readIsValidatorAccount(ctx context.Context, accountID iotago.AccountID, slot iotago.SlotIndex) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXReadIsValidatorAccount)
defer cancel()

return deps.NodeBridge.ReadIsValidatorAccount(ctx, accountID, slot)
}

// requestTips requests tips from the node.
func requestTips(ctx context.Context, count uint32) (iotago.BlockIDs, iotago.BlockIDs, iotago.BlockIDs, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXRequestTips)
defer cancel()

return deps.NodeBridge.RequestTips(ctx, count)
}

// submitBlock submits a block to the node.
func submitBlock(ctx context.Context, block *iotago.Block) (iotago.BlockID, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXSubmitBlock)
defer cancel()

return deps.NodeBridge.SubmitBlock(ctx, block)
}

// activeRootBlocks returns the active root blocks.
func activeRootBlocks(ctx context.Context) (map[iotago.BlockID]iotago.CommitmentID, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXActiveRootBlocks)
defer cancel()

return deps.NodeBridge.ActiveRootBlocks(ctx)
}

// forceCommitUntil forces the node to commit until the given slot.
func forceCommitUntil(ctx context.Context, slot iotago.SlotIndex) error {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXForceCommitUntil)
defer cancel()

return deps.NodeBridge.ForceCommitUntil(ctx, slot)
}

// commitment returns the commitment for the given slot.
func commitment(ctx context.Context, slot iotago.SlotIndex) (*nodebridge.Commitment, error) {
ctx, cancel := context.WithTimeout(ctx, TimeoutINXCommitment)
defer cancel()

return deps.NodeBridge.Commitment(ctx, slot)
}
Loading