diff --git a/common/error_codes.go b/common/error_codes.go index 8d99363e..b4205c3f 100644 --- a/common/error_codes.go +++ b/common/error_codes.go @@ -33,6 +33,7 @@ const ( CodeNamespaceNotFound codes.Code = 110 CodeNotificationsNotEnabled codes.Code = 111 CodeFollowerAlreadyPresent codes.Code = 112 + CodeFollowerAlreadyFenced codes.Code = 113 ) var ( @@ -49,4 +50,5 @@ var ( ErrorNamespaceNotFound = status.Error(CodeNamespaceNotFound, "oxia: namespace not found") ErrorNotificationsNotEnabled = status.Error(CodeNotificationsNotEnabled, "oxia: notifications not enabled on namespace") ErrorFollowerAlreadyPresent = status.Error(CodeFollowerAlreadyPresent, "oxia: follower is already present") + ErrorFollowerAlreadyFenced = status.Error(CodeFollowerAlreadyFenced, "oxia: follower is already fenced") ) diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 536ed272..c299d76c 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -498,7 +498,7 @@ func (s *shardController) newTermAndAddFollower(ctx context.Context, node model. func (s *shardController) internalNewTermAndAddFollower(ctx context.Context, node model.ServerAddress, res chan error) { fr, err := s.newTerm(ctx, node) - if err != nil { + if err != nil && status.Code(err) != common.CodeFollowerAlreadyFenced { res <- err return } diff --git a/server/follower_controller.go b/server/follower_controller.go index 6fcbcdfa..018ef4a5 100644 --- a/server/follower_controller.go +++ b/server/follower_controller.go @@ -275,7 +275,7 @@ func (fc *followerController) NewTerm(req *proto.NewTermRequest) (*proto.NewTerm slog.Int64("new-term", req.Term), slog.Any("status", fc.status), ) - return nil, common.ErrorInvalidStatus + return nil, common.ErrorFollowerAlreadyFenced } if fc.db == nil { diff --git a/server/follower_controller_test.go b/server/follower_controller_test.go index 8f776fa1..c610efb9 100644 --- a/server/follower_controller_test.go +++ b/server/follower_controller_test.go @@ -357,6 +357,41 @@ func TestFollower_NewTerm(t *testing.T) { assert.NoError(t, walFactory.Close()) } +func TestFollower_DuplicateNewTermInFollowerState(t *testing.T) { + var shardId int64 = 5 + kvFactory, err := kv.NewPebbleKVFactory(&kv.FactoryOptions{DataDir: t.TempDir()}) + assert.NoError(t, err) + walFactory := wal.NewWalFactory(&wal.FactoryOptions{BaseWalDir: t.TempDir()}) + + fc, _ := NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory) + _, _ = fc.NewTerm(&proto.NewTermRequest{Term: 1}) + + stream := newMockServerReplicateStream() + go func() { + // cancelled due to fc.Close() below + assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) + }() + + stream.AddRequest(createAddRequest(t, 1, 0, map[string]string{"a": "0", "b": "1"}, 10)) + + // Wait for acks + r1 := stream.GetResponse() + + assert.EqualValues(t, 0, r1.Offset) + + assert.Eventually(t, func() bool { + return fc.CommitOffset() == 0 + }, 10*time.Second, 10*time.Millisecond) + + r, err := fc.NewTerm(&proto.NewTermRequest{Term: 1}) + assert.Nil(t, r) + assert.Equal(t, common.CodeFollowerAlreadyFenced, status.Code(err)) + + assert.NoError(t, fc.Close()) + assert.NoError(t, kvFactory.Close()) + assert.NoError(t, walFactory.Close()) +} + // If a node is restarted, it might get the truncate request // when it's in the `NotMember` state. That is ok, provided // the request comes in the same term that the follower