Skip to content

Commit

Permalink
feat(tidbgroup): refactor to use task v3 (#6018)
Browse files Browse the repository at this point in the history
Signed-off-by: liubo02 <[email protected]>
  • Loading branch information
liubog2008 authored Jan 3, 2025
1 parent 8ddd9ee commit 8468aa9
Show file tree
Hide file tree
Showing 29 changed files with 1,646 additions and 863 deletions.
25 changes: 8 additions & 17 deletions pkg/controllers/common/cond.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package common

import "github.com/pingcap/tidb-operator/pkg/utils/task/v3"
import (
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

func CondPDHasBeenDeleted(ctx PDState) task.Condition {
return task.CondFunc(func() bool {
Expand All @@ -40,26 +43,14 @@ func CondClusterIsPaused(ctx ClusterState) task.Condition {
})
}

func CondPDGroupHasBeenDeleted(ctx PDGroupState) task.Condition {
func CondGroupIsDeleting[G runtime.Group](state GroupState[G]) task.Condition {
return task.CondFunc(func() bool {
return ctx.PDGroup() == nil
return !state.Group().GetDeletionTimestamp().IsZero()
})
}

func CondPDGroupIsDeleting(ctx PDGroupState) task.Condition {
func CondGroupHasBeenDeleted[RG runtime.GroupT[G], G runtime.GroupSet](state GroupState[RG]) task.Condition {
return task.CondFunc(func() bool {
return !ctx.PDGroup().GetDeletionTimestamp().IsZero()
})
}

func CondTiKVGroupHasBeenDeleted(ctx TiKVGroupState) task.Condition {
return task.CondFunc(func() bool {
return ctx.TiKVGroup() == nil
})
}

func CondTiKVGroupIsDeleting(ctx TiKVGroupState) task.Condition {
return task.CondFunc(func() bool {
return !ctx.TiKVGroup().GetDeletionTimestamp().IsZero()
return state.Group() == nil
})
}
95 changes: 95 additions & 0 deletions pkg/controllers/common/cond_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,106 @@ import (
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/utils/fake"
)

func TestCondGroupHasBeenDeleted(t *testing.T) {
t.Run("PDGroup", testCondGroupHasBeenDeleted[runtime.PDGroup])
t.Run("TiDBGroup", testCondGroupHasBeenDeleted[runtime.TiDBGroup])
t.Run("TiKVGroup", testCondGroupHasBeenDeleted[runtime.TiKVGroup])
t.Run("TiFlashGroup", testCondGroupHasBeenDeleted[runtime.TiFlashGroup])
}

func testCondGroupHasBeenDeleted[
G runtime.GroupSet,
RG runtime.GroupT[G],
](t *testing.T) {
cases := []struct {
desc string
state GroupState[RG]
expectedCond bool
}{
{
desc: "cond is false",
state: FakeGroupState(
fake.Fake(func(obj RG) RG {
obj.SetName("test")
return obj
}),
),
},
{
desc: "cond is true",
state: FakeGroupState[RG](nil),
expectedCond: true,
},
}

for i := range cases {
c := &cases[i]
t.Run(c.desc, func(tt *testing.T) {
tt.Parallel()

cond := CondGroupHasBeenDeleted(c.state)
assert.Equal(tt, c.expectedCond, cond.Satisfy(), c.desc)
})
}
}

func TestCondGroupIsDeleting(t *testing.T) {
t.Run("PDGroup", testCondGroupIsDeleting[runtime.PDGroup])
t.Run("TiDBGroup", testCondGroupIsDeleting[runtime.TiDBGroup])
t.Run("TiKVGroup", testCondGroupIsDeleting[runtime.TiKVGroup])
t.Run("TiFlashGroup", testCondGroupIsDeleting[runtime.TiFlashGroup])
}

func testCondGroupIsDeleting[
G runtime.GroupSet,
RG runtime.GroupT[G],
](t *testing.T) {
cases := []struct {
desc string
state GroupState[RG]
expectedCond bool
}{
{
desc: "cond is false",
state: FakeGroupState(
fake.Fake(func(obj RG) RG {
obj.SetName("test")
return obj
}),
),
},
{
desc: "cond is true",
state: FakeGroupState(
fake.Fake(func(obj RG) RG {
obj.SetName("test")
now := metav1.Now()
obj.SetDeletionTimestamp(&now)
return obj
}),
),
expectedCond: true,
},
}

for i := range cases {
c := &cases[i]
t.Run(c.desc, func(tt *testing.T) {
tt.Parallel()

cond := CondGroupIsDeleting(c.state)
assert.Equal(tt, c.expectedCond, cond.Satisfy(), c.desc)
})
}
}

func TestCondPDHasBeenDeleted(t *testing.T) {
cases := []struct {
desc string
Expand Down
65 changes: 65 additions & 0 deletions pkg/controllers/common/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import (
"fmt"
"slices"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerr "k8s.io/apimachinery/pkg/util/errors"

"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
Expand Down Expand Up @@ -121,6 +124,11 @@ func TaskContextTiKVGroup(state TiKVGroupStateInitializer, c client.Client) task
return taskContextResource("TiKVGroup", w, c, false)
}

func TaskContextTiDBGroup(state TiDBGroupStateInitializer, c client.Client) task.Task {
w := state.TiDBGroupInitializer()
return taskContextResource("TiDBGroup", w, c, false)
}

func TaskContextPDSlice(state PDSliceStateInitializer, c client.Client) task.Task {
w := state.PDSliceInitializer()
return taskContextResourceSlice[v1alpha1.PD, v1alpha1.PDList]("PDSlice", w, c)
Expand All @@ -131,6 +139,11 @@ func TaskContextTiKVSlice(state TiKVSliceStateInitializer, c client.Client) task
return taskContextResourceSlice[v1alpha1.TiKV, v1alpha1.TiKVList]("TiKVSlice", w, c)
}

func TaskContextTiDBSlice(state TiDBSliceStateInitializer, c client.Client) task.Task {
w := state.TiDBSliceInitializer()
return taskContextResourceSlice[v1alpha1.TiDB, v1alpha1.TiDBList]("TiDBSlice", w, c)
}

func TaskSuspendPod(state PodState, c client.Client) task.Task {
return task.NameTaskFunc("SuspendPod", func(ctx context.Context) task.Result {
pod := state.Pod()
Expand Down Expand Up @@ -165,6 +178,58 @@ func TaskGroupFinalizerAdd[
})
}

const defaultDelWaitTime = 10 * time.Second

func TaskGroupFinalizerDel[
GT runtime.GroupTuple[OG, RG],
IT runtime.InstanceTuple[OI, RI],
OG client.Object,
RG runtime.Group,
OI client.Object,
RI runtime.Instance,
](state GroupAndInstanceSliceState[RG, RI], c client.Client) task.Task {
var it IT
var gt GT
return task.NameTaskFunc("FinalizerDel", func(ctx context.Context) task.Result {
var errList []error
var names []string
for _, peer := range state.Slice() {
names = append(names, peer.GetName())
if peer.GetDeletionTimestamp().IsZero() {
if err := c.Delete(ctx, it.To(peer)); err != nil {
if errors.IsNotFound(err) {
continue
}
errList = append(errList, fmt.Errorf("try to delete the instance %v failed: %w", peer.GetName(), err))
continue
}
}
}

if len(errList) != 0 {
return task.Fail().With("failed to delete all instances: %v", utilerr.NewAggregate(errList))
}

if len(names) != 0 {
return task.Retry(defaultDelWaitTime).With("wait for all instances being removed, %v still exists", names)
}

wait, err := k8s.DeleteGroupSubresource(ctx, c, state.Group(), &corev1.ServiceList{})
if err != nil {
return task.Fail().With("cannot delete subresources: %w", err)
}
if wait {
return task.Wait().With("wait all subresources deleted")
}

if err := k8s.RemoveFinalizer(ctx, c, gt.To(state.Group())); err != nil {
return task.Fail().With("failed to ensure finalizer has been removed: %w", err)
}

return task.Complete().With("finalizer has been removed")
})
}

func TaskGroupStatusSuspend[
GT runtime.GroupTuple[OG, RG],
OG client.Object,
Expand Down
Loading

0 comments on commit 8468aa9

Please sign in to comment.