Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Sep 26, 2024
1 parent fd3d9ae commit 97a1aac
Showing 1 changed file with 258 additions and 14 deletions.
272 changes: 258 additions & 14 deletions node/pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ func allNodesHaveSameTermMin(nodes []*Raft, min int) bool {
return true
}

func setup(ctx context.Context, cancel context.CancelFunc) (func() error, *TestItems, error) {

func setup(ctx context.Context, cancel context.CancelFunc, numOfNodes int) (func() error, *TestItems, error) {
hosts := make([]host.Host, 0)
for i := 0; i < 3; i++ {
for i := 0; i < numOfNodes; i++ {
host, err := libp2psetup.NewHost(ctx)
if err != nil {
return nil, nil, err
Expand All @@ -77,21 +76,21 @@ func setup(ctx context.Context, cancel context.CancelFunc) (func() error, *TestI
}

pss := make([]*pubsub.PubSub, 0)
for i := 0; i < 3; i++ {
for i := 0; i < numOfNodes; i++ {
ps, err := libp2psetup.MakePubsub(ctx, hosts[i])
if err != nil {
return nil, nil, err
}
pss = append(pss, ps)
}

for i := 0; i < 3; i++ {
for i := 0; i < numOfNodes; i++ {
host := hosts[i]
host.Connect(ctx, peer.AddrInfo{ID: hosts[(i+1)%3].ID(), Addrs: hosts[(i+1)%3].Addrs()})
host.Connect(ctx, peer.AddrInfo{ID: hosts[(i+1)%numOfNodes].ID(), Addrs: hosts[(i+1)%numOfNodes].Addrs()})
}

topics := make([]*pubsub.Topic, 0)
for i := 0; i < 3; i++ {
for i := 0; i < numOfNodes; i++ {
topic, err := pss[i].Join(topicString)
if err != nil {
return nil, nil, err
Expand All @@ -100,7 +99,7 @@ func setup(ctx context.Context, cancel context.CancelFunc) (func() error, *TestI
}

raftNodes := []*Raft{}
for i := 0; i < 3; i++ {
for i := 0; i < numOfNodes; i++ {
node := NewRaftNode(hosts[i], pss[i], topics[i], 100, time.Second)
node.LeaderJob = func(context.Context) error {
log.Debug().Int("subscribers", node.SubscribersCount()).Int("Term", node.GetCurrentTerm()).Msg("Leader job")
Expand Down Expand Up @@ -136,8 +135,8 @@ func setup(ctx context.Context, cancel context.CancelFunc) (func() error, *TestI
return cleanup, testItems, nil
}

func setupRaftCluster(ctx context.Context, cancel context.CancelFunc, t *testing.T) (cleanup func() error, testItems *TestItems) {
cleanup, testItems, err := setup(ctx, cancel)
func setupRaftCluster(ctx context.Context, cancel context.CancelFunc, numOfNodes int, t *testing.T) (cleanup func() error, testItems *TestItems) {
cleanup, testItems, err := setup(ctx, cancel, numOfNodes)
if err != nil {
cancel()
t.Fatalf("error setting up test: %v", err)
Expand All @@ -149,7 +148,7 @@ func TestRaft_LeaderElection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cleanup, testItems := setupRaftCluster(ctx, cancel, t)
cleanup, testItems := setupRaftCluster(ctx, cancel, 3, t)
defer func() {
if cleanupErr := cleanup(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
Expand Down Expand Up @@ -202,7 +201,7 @@ func TestRaft_LeaderResignAndReelection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cleanup, testItems := setupRaftCluster(ctx, cancel, t)
cleanup, testItems := setupRaftCluster(ctx, cancel, 3, t)
defer func() {
if cleanupErr := cleanup(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
Expand Down Expand Up @@ -244,7 +243,7 @@ func TestRaft_NewNodeJoin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cleanup, testItems := setupRaftCluster(ctx, cancel, t)
cleanup, testItems := setupRaftCluster(ctx, cancel, 3, t)
defer func() {
if cleanupErr := cleanup(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
Expand Down Expand Up @@ -295,7 +294,7 @@ func TestRaft_LeaderDisconnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cleanup, testItems := setupRaftCluster(ctx, cancel, t)
cleanup, testItems := setupRaftCluster(ctx, cancel, 3, t)
defer func() {
if cleanupErr := cleanup(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
Expand Down Expand Up @@ -378,3 +377,248 @@ func joinNewNode(ctx context.Context, testItems *TestItems, t *testing.T) *Raft
go newNode.Run(ctx)
return newNode
}

func TestRaft_LeaderElection_2_nodes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cleanup, testItems := setupRaftCluster(ctx, cancel, 2, t)
defer func() {
if cleanupErr := cleanup(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

for _, raftNode := range testItems.RaftNodes {
go raftNode.Run(ctx)
}

WaitForCondition(ctx, t, func() bool {
return allNodesHaveSameTerm(testItems.RaftNodes)
})

t.Run("Verify single leader across nodes", func(t *testing.T) {
WaitForCondition(ctx, t, func() bool {
return allNodesHaveSameTerm(testItems.RaftNodes)
})

leaderIds := make(map[string]struct{})
for _, node := range testItems.RaftNodes {
leader := node.GetLeader()
leaderIds[leader] = struct{}{}
assert.Equal(t, 1, node.SubscribersCount())
}
assert.Equal(t, 1, len(leaderIds))
})

t.Run("All nodes except leader should be followers", func(t *testing.T) {
followerCount := 0
for _, node := range testItems.RaftNodes {
if node.GetRole() == Follower {
followerCount++
}
}
assert.Equal(t, len(testItems.RaftNodes)-1, followerCount)
})

t.Run("All nodes should have the same term", func(t *testing.T) {
terms := make(map[int]struct{})
for _, node := range testItems.RaftNodes {
term := node.GetCurrentTerm()
terms[term] = struct{}{}
}
assert.Equal(t, 1, len(terms))
})
}

func TestRaft_LeaderResignAndReelection_2_nodes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cleanup, testItems := setupRaftCluster(ctx, cancel, 2, t)
defer func() {
if cleanupErr := cleanup(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

for _, raftNode := range testItems.RaftNodes {
go raftNode.Run(ctx)
}

WaitForCondition(ctx, t, func() bool {
return allNodesHaveSameTerm(testItems.RaftNodes)
})

t.Run("Leader resign and reelection", func(t *testing.T) {
var lastTerm int
for _, node := range testItems.RaftNodes {
if node.GetRole() == Leader {
lastTerm = node.GetCurrentTerm()
node.ResignLeader()
break
}
}
WaitForCondition(ctx, t, func() bool {
return allNodesHaveSameTermMin(testItems.RaftNodes, lastTerm)
})

leaderIds := make(map[string]struct{})
for _, node := range testItems.RaftNodes {
leader := node.GetLeader()
leaderIds[leader] = struct{}{}
}

assert.Equal(t, 1, len(leaderIds))
})
}

func TestRaft_LeaderDisconnect_2_nodes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cleanup, testItems := setupRaftCluster(ctx, cancel, 2, t)
defer func() {
if cleanupErr := cleanup(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

for _, raftNode := range testItems.RaftNodes {
go raftNode.Run(ctx)
}

WaitForCondition(ctx, t, func() bool {
return allNodesHaveSameTerm(testItems.RaftNodes)
})

t.Run("Leader disconnect and reelection", func(t *testing.T) {
var prevLeaderID string
var oldLeaderIndex int
var lastTerm int
for i, node := range testItems.RaftNodes {
if node.GetRole() == Leader {
oldLeaderIndex = i
lastTerm = node.GetCurrentTerm()
prevLeaderID = node.GetHostId()
testItems.Hosts[i].Close()
break
}
}
assert.NotEmpty(t, prevLeaderID)

newList := []*Raft{}
for i, node := range testItems.RaftNodes {
if i == oldLeaderIndex {
continue
}
newList = append(newList, node)
}

WaitForCondition(ctx, t, func() bool {
return allNodesHaveSameTermMin(newList, lastTerm)
})

var newLeaderID string
for i, node := range testItems.RaftNodes {
if i == oldLeaderIndex {
continue
}
assert.Equal(t, 1, node.SubscribersCount())
if node.GetRole() == Leader {
newLeaderID = node.GetHostId()
break
}
}

assert.NotEmpty(t, newLeaderID)
assert.NotEqual(t, prevLeaderID, newLeaderID)
})
}

// func TestRaft_NetworkPartitionAndLeaderMismatch(t *testing.T) {
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

// // Set up the test Raft cluster
// cleanup, testItems := setupRaftCluster(ctx, cancel, 2, t)
// defer func() {
// if cleanupErr := cleanup(); cleanupErr != nil {
// t.Logf("Cleanup failed: %v", cleanupErr)
// }
// }()

// // Start running all raft nodes
// for _, raftNode := range testItems.RaftNodes {
// go raftNode.Run(ctx)
// }

// // Wait for initial leader election
// WaitForCondition(ctx, t, func() bool {
// return allNodesHaveSameTerm(testItems.RaftNodes)
// })

// t.Run("Network partition between Node A and Node B", func(t *testing.T) {
// // Disconnect Node A (index 0) from Node B (index 1)
// // Assuming Node A is the leader initially
// var hostIdx int
// for i, node := range testItems.RaftNodes {
// if node.LeaderID == node.GetHostId() {
// hostIdx = i
// break
// }
// }

// // Disconnect Node A from Node B and C to simulate partition
// testItems.Hosts[hostIdx].Close() // This simulates network failure on Node A

// // Wait for the partition to cause a re-election
// time.Sleep(time.Second * 5) // Simulate some delay to trigger timeouts

// // Node B (index 1) should attempt to become leader since it can't reach the leader
// WaitForCondition(ctx, t, func() bool {
// return testItems.RaftNodes[1].GetRole() == Candidate
// })

// // Node B may become leader in this case
// WaitForCondition(ctx, t, func() bool {
// return testItems.RaftNodes[1].GetRole() == Leader
// })

// // Now, simulate Node A reconnecting
// newHost, err := libp2psetup.NewHost(ctx)
// assert.NoError(t, err)
// testItems.Hosts[0] = newHost
// newPs, err := libp2psetup.MakePubsub(ctx, newHost)
// assert.NoError(t, err)
// testItems.Pss[0] = newPs
// newTopic, err := newPs.Join(topicString)
// assert.NoError(t, err)
// newRaftNode := NewRaftNode(newHost, newPs, newTopic, 100, time.Second)
// go newRaftNode.Run(ctx)
// testItems.RaftNodes[0] = newRaftNode

// // Reconnect Node A to the rest of the cluster
// for _, host := range testItems.Hosts[1:] {
// newHost.Connect(ctx, peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()})
// }

// // Wait for Node A to rejoin the cluster and handle the mismatch in leader status
// time.Sleep(time.Second * 10) // Let nodes sync up

// // Ensure there's a consistent leader and no multiple leaders
// leaderIds := make(map[string]struct{})
// for _, node := range testItems.RaftNodes {
// leader := node.GetLeader()
// leaderIds[leader] = struct{}{}
// }

// // There should only be one leader after Node A rejoins
// assert.Equal(t, 1, len(leaderIds), "Expected a single leader after reconnection")

// // Check that all nodes are synchronized in terms of the current term
// WaitForCondition(ctx, t, func() bool {
// return allNodesHaveSameTerm(testItems.RaftNodes)
// })
// })
// }

0 comments on commit 97a1aac

Please sign in to comment.