diff --git a/node/pkg/raft/accessors.go b/node/pkg/raft/accessors.go index b61e9294d..43d95a19f 100644 --- a/node/pkg/raft/accessors.go +++ b/node/pkg/raft/accessors.go @@ -6,72 +6,12 @@ func (r *Raft) IncreaseTerm() { r.Term++ } -func (r *Raft) UpdateTerm(newTerm int) { - r.Mutex.Lock() - defer r.Mutex.Unlock() - r.Term = newTerm -} - -func (r *Raft) GetCurrentTerm() int { - r.Mutex.Lock() - defer r.Mutex.Unlock() - return r.Term -} - -func (r *Raft) IncreaseVote() { - r.Mutex.Lock() - defer r.Mutex.Unlock() - r.VotesReceived++ -} - -func (r *Raft) UpdateVoteReceived(votes int) { - r.Mutex.Lock() - defer r.Mutex.Unlock() - r.VotesReceived = votes -} - -func (r *Raft) GetVoteReceived() int { - r.Mutex.Lock() - defer r.Mutex.Unlock() - return r.VotesReceived -} - -func (r *Raft) UpdateRole(role RoleType) { - r.Mutex.Lock() - defer r.Mutex.Unlock() - r.Role = role -} - -func (r *Raft) GetRole() RoleType { - r.Mutex.Lock() - defer r.Mutex.Unlock() - return r.Role -} - -func (r *Raft) GetVotedFor() string { - r.Mutex.Lock() - defer r.Mutex.Unlock() - return r.VotedFor -} - func (r *Raft) GetLeader() string { r.Mutex.Lock() defer r.Mutex.Unlock() return r.LeaderID } -func (r *Raft) UpdateLeader(leader string) { - r.Mutex.Lock() - defer r.Mutex.Unlock() - r.LeaderID = leader -} - -func (r *Raft) UpdateVotedFor(votedFor string) { - r.Mutex.Lock() - defer r.Mutex.Unlock() - r.VotedFor = votedFor -} - func (r *Raft) SubscribersCount() int { return len(r.Ps.ListPeers(r.Topic.String())) } diff --git a/node/pkg/raft/raft.go b/node/pkg/raft/raft.go index 2b9dba36e..55649c524 100644 --- a/node/pkg/raft/raft.go +++ b/node/pkg/raft/raft.go @@ -47,15 +47,10 @@ func NewRaftNode( func (r *Raft) Run(ctx context.Context) { go r.subscribe(ctx) r.startElectionTimer() - for { select { case msg := <-r.MessageBuffer: - err := r.handleMessage(ctx, msg) - if err != nil { - log.Error().Err(err).Msg("failed to handle message") - } - + go r.handleMessage(ctx, msg) case <-r.ElectionTimer.C: r.startElection() case <-ctx.Done(): @@ -84,11 +79,13 @@ func (r *Raft) subscribe(ctx context.Context) { log.Error().Err(err).Msg("failed to get message from topic") continue } + msg, err := r.unmarshalMessage(rawMsg.Data) if err != nil { log.Error().Err(err).Msg("failed to unmarshal message") continue } + r.MessageBuffer <- msg } } @@ -113,7 +110,6 @@ func (r *Raft) handleHeartbeat(msg Message) error { if msg.SentFrom == r.GetHostId() { return nil } - var heartbeatMessage HeartbeatMessage err := json.Unmarshal(msg.Data, &heartbeatMessage) if err != nil { @@ -125,9 +121,12 @@ func (r *Raft) handleHeartbeat(msg Message) error { return errorSentinel.ErrRaftLeaderIdMismatch } - currentRole := r.GetRole() - currentTerm := r.GetCurrentTerm() - currentLeader := r.GetLeader() + r.Mutex.Lock() + defer r.Mutex.Unlock() + + currentRole := r.Role + currentTerm := r.Term + currentLeader := r.LeaderID if currentTerm > heartbeatMessage.Term && currentRole != Leader { r.startElectionTimer() @@ -141,21 +140,22 @@ func (r *Raft) handleHeartbeat(msg Message) error { if currentRole == Leader { r.ResignLeader() } else if currentRole == Candidate { - r.UpdateRole(Follower) + r.Role = Follower } r.startElectionTimer() - r.UpdateTerm(heartbeatMessage.Term) + r.Term = heartbeatMessage.Term if currentLeader != heartbeatMessage.LeaderID { - r.UpdateLeader(heartbeatMessage.LeaderID) + r.LeaderID = heartbeatMessage.LeaderID } - return nil } func (r *Raft) handleRequestVote(msg Message) error { - if r.GetRole() == Leader { + r.Mutex.Lock() + defer r.Mutex.Unlock() + if r.Role == Leader { return nil } @@ -166,32 +166,34 @@ func (r *Raft) handleRequestVote(msg Message) error { return err } - currentTerm := r.GetCurrentTerm() + currentTerm := r.Term if RequestVoteMessage.Term > currentTerm { - r.UpdateTerm(RequestVoteMessage.Term) + r.Term = RequestVoteMessage.Term } if RequestVoteMessage.Term < currentTerm { return r.sendReplyVote(msg.SentFrom, false) } - if r.GetRole() == Candidate && RequestVoteMessage.Term == currentTerm && msg.SentFrom != r.GetHostId() { - r.UpdateRole(Follower) + if r.Role == Candidate && RequestVoteMessage.Term == currentTerm && msg.SentFrom != r.GetHostId() { + r.Role = Follower return r.sendReplyVote(msg.SentFrom, false) } voteGranted := false - if r.GetVotedFor() == "" || r.GetVotedFor() == msg.SentFrom { + if r.VotedFor == "" || r.VotedFor == msg.SentFrom { voteGranted = true - r.UpdateVotedFor(msg.SentFrom) + r.VotedFor = msg.SentFrom } log.Debug().Bool("vote granted", voteGranted).Msg("voted") return r.sendReplyVote(msg.SentFrom, voteGranted) } func (r *Raft) handleReplyVote(ctx context.Context, msg Message) error { - if r.GetRole() != Candidate { + r.Mutex.Lock() + defer r.Mutex.Unlock() + if r.Role != Candidate { return nil } @@ -205,11 +207,11 @@ func (r *Raft) handleReplyVote(ctx context.Context, msg Message) error { return nil } - if replyVoteMessage.VoteGranted && replyVoteMessage.LeaderID == r.GetHostId() && r.GetRole() == Candidate { - r.IncreaseVote() - log.Debug().Int("vote received", r.GetVoteReceived()).Msg("vote received") + if replyVoteMessage.VoteGranted && replyVoteMessage.LeaderID == r.GetHostId() && r.Role == Candidate { + r.VotesReceived++ + log.Debug().Int("vote received", r.VotesReceived).Msg("vote received") log.Debug().Int("subscribers count", r.SubscribersCount()).Msg("subscribers count") - if r.GetVoteReceived() >= (r.SubscribersCount()+1)/2 { + if r.VotesReceived >= (r.SubscribersCount()+1)/2 { r.becomeLeader(ctx) } } @@ -227,11 +229,12 @@ func (r *Raft) PublishMessage(msg Message) error { } func (r *Raft) sendHeartbeat() error { - + r.Mutex.Lock() heartbeatMessage := HeartbeatMessage{ LeaderID: r.GetHostId(), - Term: r.GetCurrentTerm(), + Term: r.Term, } + r.Mutex.Unlock() marshalledHeartbeatMsg, err := json.Marshal(heartbeatMessage) if err != nil { log.Error().Err(err).Msg("failed to marshal heartbeat message") @@ -274,7 +277,7 @@ func (r *Raft) sendReplyVote(to string, voteGranted bool) error { func (r *Raft) sendRequestVote() error { requestVoteMessage := RequestVoteMessage{ - Term: r.GetCurrentTerm(), + Term: r.Term, } marshalledRequestVoteMsg, err := json.Marshal(requestVoteMessage) if err != nil { @@ -299,32 +302,37 @@ func (r *Raft) ResignLeader() { if r.Resign != nil { close(r.Resign) r.Resign = nil - - r.UpdateRole(Follower) - r.UpdateLeader("") + r.Role = Follower + r.LeaderID = "" r.startElectionTimer() } } -func (r *Raft) becomeLeader(ctx context.Context) { - - log.Debug().Msg("becoming leader") - +func (r *Raft) setLeaderState() { r.Resign = make(chan interface{}) r.ElectionTimer.Stop() - r.UpdateRole(Leader) - r.UpdateLeader(r.GetHostId()) + r.Role = Leader + r.LeaderID = r.GetHostId() r.HeartbeatTicker = time.NewTicker(r.HeartbeatTimeout) r.LeaderJobTicker = time.NewTicker(r.LeaderJobTimeout) +} +func (r *Raft) becomeLeader(ctx context.Context) { + r.setLeaderState() go func() { + defer func() { + if r := recover(); r != nil { + log.Error().Msgf("recovered from panic in leader job: %v", r) + } + }() + for { select { case <-r.Resign: - log.Debug().Msg("resigning as leader") + r.Mutex.Lock() r.HeartbeatTicker.Stop() r.LeaderJobTicker.Stop() - + r.Mutex.Unlock() return case <-r.HeartbeatTicker.C: @@ -335,11 +343,6 @@ func (r *Raft) becomeLeader(ctx context.Context) { case <-r.LeaderJobTicker.C: go func() { - defer func() { - if r := recover(); r != nil { - log.Error().Msgf("recovered from panic in leader job: %v", r) - } - }() err := r.LeaderJob() if err != nil { log.Error().Err(err).Msg("failed to execute leader job") @@ -348,8 +351,10 @@ func (r *Raft) becomeLeader(ctx context.Context) { case <-ctx.Done(): log.Debug().Msg("context cancelled") + r.Mutex.Lock() r.HeartbeatTicker.Stop() r.LeaderJobTicker.Stop() + r.Mutex.Unlock() return } } @@ -359,23 +364,34 @@ func (r *Raft) becomeLeader(ctx context.Context) { func (r *Raft) getRandomElectionTimeout() time.Duration { minTimeout := int(r.HeartbeatTimeout) * 3 maxTimeout := int(r.HeartbeatTimeout) * 6 - return time.Duration(minTimeout + rand.Intn(maxTimeout-minTimeout)) + duration := time.Duration(minTimeout + rand.Intn(maxTimeout-minTimeout)) + return duration } func (r *Raft) startElectionTimer() { if r.ElectionTimer != nil { - r.ElectionTimer.Stop() + if !r.ElectionTimer.Stop() { + select { + case <-r.ElectionTimer.C: + log.Debug().Msg("Old timer channel drained") + default: + log.Debug().Msg("Old timer channel already empty") + } + } + r.ElectionTimer.Reset(r.getRandomElectionTimeout()) + } else { + r.ElectionTimer = time.NewTimer(r.getRandomElectionTimeout()) } - r.ElectionTimer = time.NewTimer(r.getRandomElectionTimeout()) } func (r *Raft) startElection() { - r.IncreaseTerm() - r.UpdateVoteReceived(0) log.Debug().Msg("start election") - - r.UpdateRole(Candidate) - r.UpdateVotedFor(r.GetHostId()) + r.Mutex.Lock() + defer r.Mutex.Unlock() + r.Term++ + r.VotesReceived = 0 + r.Role = Candidate + r.VotedFor = r.GetHostId() r.startElectionTimer()