Skip to content

Commit

Permalink
Merge branch 'main' into fix.deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Nov 15, 2024
2 parents 84d2a52 + c2a822f commit 44bdee0
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 4 deletions.
4 changes: 4 additions & 0 deletions common/error_codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
CodeInvalidSessionTimeout codes.Code = 109
CodeNamespaceNotFound codes.Code = 110
CodeNotificationsNotEnabled codes.Code = 111
CodeFollowerAlreadyPresent codes.Code = 112
CodeFollowerAlreadyFenced codes.Code = 113
)

var (
Expand All @@ -47,4 +49,6 @@ var (
ErrorInvalidSessionTimeout = status.Error(CodeInvalidSessionTimeout, "oxia: invalid session timeout")
ErrorNamespaceNotFound = status.Error(CodeNamespaceNotFound, "oxia: namespace not found")
ErrorNotificationsNotEnabled = status.Error(CodeNotificationsNotEnabled, "oxia: notifications not enabled on namespace")
ErrorFollowerAlreadyPresent = status.Error(CodeFollowerAlreadyPresent, "oxia: follower is already present")
ErrorFollowerAlreadyFenced = status.Error(CodeFollowerAlreadyFenced, "oxia: follower is already fenced")
)
4 changes: 2 additions & 2 deletions coordinator/impl/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (s *shardController) newTermAndAddFollower(ctx context.Context, node model.

func (s *shardController) internalNewTermAndAddFollower(ctx context.Context, node model.ServerAddress, res chan error) {
fr, err := s.newTerm(ctx, node)
if err != nil {
if err != nil && status.Code(err) != common.CodeFollowerAlreadyFenced {
res <- err
return
}
Expand All @@ -512,7 +512,7 @@ func (s *shardController) internalNewTermAndAddFollower(ctx context.Context, nod
if err = s.addFollower(*s.shardMetadata.Leader, node.Internal, &proto.EntryId{
Term: fr.Term,
Offset: fr.Offset,
}); err != nil {
}); err != nil && status.Code(err) != common.CodeFollowerAlreadyPresent {
res <- err
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/follower_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (fc *followerController) NewTerm(req *proto.NewTermRequest) (*proto.NewTerm
slog.Int64("new-term", req.Term),
slog.Any("status", fc.status),
)
return nil, common.ErrorInvalidStatus
return nil, common.ErrorFollowerAlreadyFenced
}

if fc.db == nil {
Expand Down
35 changes: 35 additions & 0 deletions server/follower_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,41 @@ func TestFollower_NewTerm(t *testing.T) {
assert.NoError(t, walFactory.Close())
}

func TestFollower_DuplicateNewTermInFollowerState(t *testing.T) {
var shardId int64 = 5
kvFactory, err := kv.NewPebbleKVFactory(&kv.FactoryOptions{DataDir: t.TempDir()})
assert.NoError(t, err)
walFactory := wal.NewWalFactory(&wal.FactoryOptions{BaseWalDir: t.TempDir()})

fc, _ := NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory)
_, _ = fc.NewTerm(&proto.NewTermRequest{Term: 1})

stream := newMockServerReplicateStream()
go func() {
// cancelled due to fc.Close() below
assert.ErrorIs(t, fc.Replicate(stream), context.Canceled)
}()

stream.AddRequest(createAddRequest(t, 1, 0, map[string]string{"a": "0", "b": "1"}, 10))

// Wait for acks
r1 := stream.GetResponse()

assert.EqualValues(t, 0, r1.Offset)

assert.Eventually(t, func() bool {
return fc.CommitOffset() == 0
}, 10*time.Second, 10*time.Millisecond)

r, err := fc.NewTerm(&proto.NewTermRequest{Term: 1})
assert.Nil(t, r)
assert.Equal(t, common.CodeFollowerAlreadyFenced, status.Code(err))

assert.NoError(t, fc.Close())
assert.NoError(t, kvFactory.Close())
assert.NoError(t, walFactory.Close())
}

// If a node is restarted, it might get the truncate request
// when it's in the `NotMember` state. That is ok, provided
// the request comes in the same term that the follower
Expand Down
2 changes: 1 addition & 1 deletion server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (lc *leaderController) AddFollower(req *proto.AddFollowerRequest) (*proto.A
}

if _, followerAlreadyPresent := lc.followers[req.FollowerName]; followerAlreadyPresent {
return nil, errors.Errorf("follower %s is already present", req.FollowerName)
return nil, errors.Wrapf(common.ErrorFollowerAlreadyPresent, "follower: %s", req.FollowerName)
}

if len(lc.followers) == int(lc.replicationFactor)-1 {
Expand Down
53 changes: 53 additions & 0 deletions server/leader_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,59 @@ func TestLeaderController_AddFollower(t *testing.T) {
assert.NoError(t, walFactory.Close())
}

func TestLeaderController_AddFollowerRepeated(t *testing.T) {
var shard int64 = 1

kvFactory, err := kv.NewPebbleKVFactory(testKVOptions)
assert.NoError(t, err)
walFactory := newTestWalFactory(t)

lc, err := NewLeaderController(Config{}, common.DefaultNamespace, shard, newMockRpcClient(), walFactory, kvFactory)
assert.NoError(t, err)

_, err = lc.NewTerm(&proto.NewTermRequest{
Term: 5,
Shard: shard,
})
assert.NoError(t, err)

assert.EqualValues(t, 5, lc.Term())
assert.Equal(t, proto.ServingStatus_FENCED, lc.Status())

_, err = lc.BecomeLeader(context.Background(), &proto.BecomeLeaderRequest{
Shard: shard,
Term: 5,
ReplicationFactor: 3,
FollowerMaps: map[string]*proto.EntryId{
"f1": InvalidEntryId,
},
})
assert.NoError(t, err)

// f1 is already connected
afRes, err := lc.AddFollower(&proto.AddFollowerRequest{
Shard: shard,
Term: 5,
FollowerName: "f1",
FollowerHeadEntryId: InvalidEntryId,
})
assert.Nil(t, afRes)
assert.Error(t, err)

_, err = lc.AddFollower(&proto.AddFollowerRequest{
Shard: shard,
Term: 5,
FollowerName: "f1",
FollowerHeadEntryId: InvalidEntryId,
})
assert.Error(t, err)
assert.Equal(t, common.CodeFollowerAlreadyPresent, status.Code(err))

assert.NoError(t, lc.Close())
assert.NoError(t, kvFactory.Close())
assert.NoError(t, walFactory.Close())
}

// When a follower is added after the initial leader election,
// the leader should use the head-entry at the time of the
// election instead of the current head-entry.
Expand Down

0 comments on commit 44bdee0

Please sign in to comment.