Skip to content

Commit

Permalink
feat: test reconcile logic (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
whg517 authored May 23, 2024
1 parent 18ee595 commit 2eb6bd4
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 52 deletions.
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

supersetv1alpha1 "github.com/zncdatadev/superset-operator/api/v1alpha1"
"github.com/zncdatadev/superset-operator/internal/controller"
Expand Down Expand Up @@ -67,7 +67,7 @@ func main() {

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: server.Options{BindAddress: metricsAddr},
Metrics: metricsserver.Options{BindAddress: metricsAddr},
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "e19e02e9.zncdata.dev",
Expand Down
1 change: 0 additions & 1 deletion internal/controller/cluster/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (b *JobBuilder) mainContainer() *corev1.Container {
containerBuilder.AddVolumeMount(volumeMount)
// SetCommand([]string{"/bin/sh", "-c", ". /app/pythonpath/superset_bootstrap.sh; . /app/pythonpath/superset_init.sh"})
containerBuilder.SetCommand([]string{"tail", "-f"})
containerBuilder.AddEnvFromSecret(b.ClusterConfig.ConfigSecretName)
containerBuilder.AddEnvFromSecret(b.ClusterConfig.EnvSecretName)

existAdminSecretName := b.ClusterConfig.Spec.Administrator.ExistSecret
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/common/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (b *DeploymentBuilder) GetMainContainer() builder.ContainerBuilder {
)
containerBuilder.AddEnvFromSecret(b.ClusterConfig.EnvSecretName)
containerBuilder.SetCommand([]string{"/bin/sh", "-c", ". /app/pythonpath/superset_bootstrap.sh; /usr/bin/run-server.sh"})
containerBuilder.AddVolumeMount(corev1.VolumeMount{Name: "superset-config", MountPath: "/app/superset", ReadOnly: true})
containerBuilder.AddVolumeMount(corev1.VolumeMount{Name: "superset-config", MountPath: "/app/pythonpath", ReadOnly: true})
containerBuilder.AddPorts(b.Options.GetPorts())
containerBuilder.SetProbeWithHealth()
return containerBuilder
Expand All @@ -47,7 +47,7 @@ func (b *DeploymentBuilder) GetMainContainer() builder.ContainerBuilder {
func (b *DeploymentBuilder) GetInitContainer() builder.ContainerBuilder {
containerBuilder := builder.NewGenericContainerBuilder(
"wait-for-postgres-redis",
b.Options.GetImage().String(),
"apache/superset:dockerize",
b.Options.GetImage().PullPolicy,
)
containerBuilder.SetCommand([]string{"/bin/sh", "-c", "dockerize -wait \"tcp://$DB_HOST:$DB_PORT\" -wait \"tcp://$REDIS_HOST:$REDIS_PORT\" -timeout 120s"})
Expand All @@ -60,7 +60,7 @@ func (b *DeploymentBuilder) GetVolume() *corev1.Volume {
Name: "superset-config",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: b.ClusterConfig.EnvSecretName,
SecretName: b.ClusterConfig.ConfigSecretName,
},
},
}
Expand Down
1 change: 1 addition & 0 deletions internal/controller/node/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (r *Reconciler) RegisterResourceWithRoleGroup(
EnvOverrides: roleGroup.EnvOverrides,
//PodOverrides: roleGroup.PodOverrides, TODO: Uncomment this line
}
roleGroupOptions.SetPorts(Ports)

service := reconciler.NewServiceReconciler(
r.Client,
Expand Down
12 changes: 0 additions & 12 deletions internal/controller/worker/port.go

This file was deleted.

6 changes: 0 additions & 6 deletions internal/controller/worker/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ func (r *Reconciler) RegisterResourceWithRoleGroup(
//PodOverrides: roleGroup.PodOverrides, TODO: Uncomment this line
}

service := reconciler.NewServiceReconciler(
r.Client,
roleGroupOptions,
)
r.AddResource(service)

deployment := NewDeploymentReconciler(
r.Client,
r.ClusterConfig,
Expand Down
1 change: 1 addition & 0 deletions pkg/builder/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (b *GenericDeploymentBuilder) GetObject() *appv1.Deployment {
b.replicas = &DefaultReplicas
}
obj := &appv1.Deployment{
ObjectMeta: b.GetObjectMeta(),
Spec: appv1.DeploymentSpec{
Replicas: b.replicas,
Selector: &metav1.LabelSelector{
Expand Down
10 changes: 10 additions & 0 deletions pkg/builder/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ var (
)

type Options interface {
GetClusterName() string
GetName() string
GetNamespace() string
GetFullName() string
GetLabels() map[string]string
AddLabels(labels map[string]string)
Expand All @@ -41,10 +43,18 @@ type ClusterOptions struct {
Ports []corev1.ContainerPort
}

func (o *ClusterOptions) GetClusterName() string {
return o.Name
}

func (o *ClusterOptions) GetName() string {
return o.Name
}

func (o *ClusterOptions) GetNamespace() string {
return o.Namespace
}

func (o *ClusterOptions) GetFullName() string {
return o.Name
}
Expand Down
92 changes: 75 additions & 17 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ import (
"github.com/cisco-open/k8s-objectmatcher/patch"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var (
clientLogger = ctrl.Log.WithName("resourceClient")
clientLogger = ctrl.Log.WithName("client")
)

type Client struct {
Expand Down Expand Up @@ -70,30 +71,68 @@ func (c *Client) Get(ctx context.Context, obj ctrlclient.Object) error {
return nil
}

func (c *Client) SetOwnerReference(obj ctrlclient.Object, gvk *schema.GroupVersionKind) error {

if obj.GetNamespace() == "" {
clientLogger.V(5).Info("Skip setting owner reference for object without namespace, it maybe a cluster-scoped resource",
"gvk", gvk,
"name", obj.GetName(),
)
return nil
}
if err := ctrl.SetControllerReference(c.OwnerReference, obj, c.Client.Scheme()); err != nil {
clientLogger.Error(err, "Failed to set owner reference",
"gvk", gvk,
"namespace", obj.GetNamespace(),
"name", obj.GetName(),
)
return err
}

clientLogger.V(5).Info("Set owner reference for object",
"gvk", gvk, "namespace",
obj.GetNamespace(),
"name", obj.GetName(),
"owner", c.OwnerReference.GetName(),
)

return nil
}

func (c *Client) CreateOrUpdate(ctx context.Context, obj ctrlclient.Object) (mutation bool, err error) {

key := ctrlclient.ObjectKeyFromObject(obj)
objectKey := ctrlclient.ObjectKeyFromObject(obj)
namespace := obj.GetNamespace()
kinds, _, _ := c.Client.Scheme().ObjectKinds(obj)

gvk, err := GetObjectGVK(obj)
if err != nil {
return false, err
}

name := obj.GetName()

clientLogger.V(5).Info("Creating or updating object", "Kind", kinds, "Namespace", namespace, "Name", name)
if err := c.SetOwnerReference(obj, gvk); err != nil {
return false, err
}

clientLogger.V(5).Info("Creating or updating object", "gvk", gvk, "namespace", namespace, "name", name)

current := obj.DeepCopyObject().(ctrlclient.Object)
// Check if the object exists, if not create a new one
err = c.Client.Get(ctx, key, current)
err = c.Client.Get(ctx, objectKey, current)
var calculateOpt = []patch.CalculateOption{
patch.IgnoreStatusFields(),
}
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
clientLogger.V(1).Info("Resource not found, creating a new.", "gvk", gvk, "namespace", namespace, "name", name)
if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(obj); err != nil {
return false, err
}
clientLogger.Info("Creating a new object", "Kind", kinds, "Namespace", namespace, "Name", name)

if err := c.Client.Create(ctx, obj); err != nil {
clientLogger.Error(err, "Failed to create resource", "gvk", gvk, "namespace", namespace, "name", name)
return false, err
}
clientLogger.V(5).Info("Resource created", "gvk", gvk, "namespace", namespace, "name", name)
return true, nil
} else if err == nil {
switch obj.(type) {
Expand All @@ -117,39 +156,58 @@ func (c *Client) CreateOrUpdate(ctx context.Context, obj ctrlclient.Object) (mut
case *appsv1.StatefulSet:
calculateOpt = append(calculateOpt, patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus())
}

result, err := patch.DefaultPatchMaker.Calculate(current, obj, calculateOpt...)
if err != nil {
clientLogger.Error(err, "failed to calculate patch to match objects, moving on to update")
clientLogger.Error(err, "Failed to calculate patch to match objects, moving to update", "gvk", gvk, "namespace", namespace, "name", name)
// if there is an error with matching, we still want to update
resourceVersion := current.(metav1.ObjectMetaAccessor).GetObjectMeta().GetResourceVersion()
obj.(metav1.ObjectMetaAccessor).GetObjectMeta().SetResourceVersion(resourceVersion)

if err := c.Client.Update(ctx, obj); err != nil {
clientLogger.Error(err, "Failed to update resource", "gvk", gvk, "namespace", namespace, "name", name)
return false, err
}
clientLogger.V(5).Info("Resource updated", "gvk", gvk, "namespace", namespace, "name", name)
return true, nil
}

if !result.IsEmpty() {
clientLogger.Info(
fmt.Sprintf("Resource update for object %s:%s", kinds, obj.(metav1.ObjectMetaAccessor).GetObjectMeta().GetName()),
"patch", string(result.Patch),
)
clientLogger.V(1).Info("Resource modified, updating", "gvk", gvk, "namespace", namespace, "name", name)
// ignore the update if the object is a secret
if _, ok := obj.(*corev1.Secret); !ok {
clientLogger.V(1).Info("Patch result", "gvk", gvk, "namespace", namespace, "name", name, "patch", string(result.Patch))
}

if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(obj); err != nil {
clientLogger.Error(err, "failed to annotate modified object", "object", obj)
clientLogger.Error(err, "Failed to update object annotation, moving to update", "gvk", gvk, "namespace", namespace, "name", name)
}

resourceVersion := current.(metav1.ObjectMetaAccessor).GetObjectMeta().GetResourceVersion()
obj.(metav1.ObjectMetaAccessor).GetObjectMeta().SetResourceVersion(resourceVersion)

if err = c.Client.Update(ctx, obj); err != nil {
clientLogger.Error(err, "Failed to update resource", "gvk", gvk, "namespace", namespace, "name", name)
return false, err
}
return true, nil
}
clientLogger.V(1).Info(fmt.Sprintf("Skipping update for object %s:%s", kinds, obj.(metav1.ObjectMetaAccessor).GetObjectMeta().GetName()))

clientLogger.V(1).Info("Skipping update for object", "gvk", gvk, "namespace", namespace, "name", name)
}
return false, err
}

func GetObjectGVK(obj ctrlclient.Object) (*schema.GroupVersionKind, error) {
gvks, _, err := scheme.Scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}

if len(gvks) == 0 {
return nil, fmt.Errorf("no GroupVersionKind found for object %T", obj)
}

gvk := gvks[0]

return &gvk, nil
}
2 changes: 1 addition & 1 deletion pkg/reconciler/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

var (
logger = ctrl.Log.WithName("common").WithName("reconciler")
logger = ctrl.Log.WithName("reconciler")
)

type ClusterReconciler interface {
Expand Down
11 changes: 8 additions & 3 deletions pkg/reconciler/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@ func (r *DeploymentReconciler) Reconcile(ctx context.Context) Result {
}

func (r *DeploymentReconciler) Ready(ctx context.Context) Result {
obj := appv1.Deployment{}

obj := appv1.Deployment{
ObjectMeta: r.GetObjectMeta(),
}
logger.V(1).Info("Checking deployment ready", "namespace", obj.Namespace, "name", obj.Name)
if err := r.Client.Get(ctx, &obj); err != nil {
return NewResult(true, 0, err)
}
if obj.Status.ReadyReplicas == *obj.Spec.Replicas {
logger.V(1).Info("Deployment is ready", "namespace", obj.Namespace, "name", obj.Name)
logger.Info("Deployment is ready", "namespace", obj.Namespace, "name", obj.Name, "replicas", *obj.Spec.Replicas, "readyReplicas", obj.Status.ReadyReplicas)
return NewResult(false, 0, nil)
}
logger.V(1).Info("Deployment is not ready", "namespace", obj.Namespace, "name", obj.Name)
logger.Info("Deployment is not ready", "namespace", obj.Namespace, "name", obj.Name, "replicas", *obj.Spec.Replicas, "readyReplicas", obj.Status.ReadyReplicas)
return NewResult(false, 5, nil)
}

Expand All @@ -65,5 +69,6 @@ func NewDeploymentReconciler(
options,
deployBuilder,
),
Options: options,
}
}
6 changes: 6 additions & 0 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
type AnySpec any

type Reconciler interface {
GetName() string
GetNamespace() string
GetClient() *client.Client
GetCtrlClient() ctrlclient.Client
GetCtrlScheme() *runtime.Scheme
Expand All @@ -36,6 +38,10 @@ func (b *BaseReconciler[T]) GetName() string {
return b.Options.GetFullName()
}

func (b *BaseReconciler[T]) GetNamespace() string {
return b.Options.GetNamespace()
}

func (b *BaseReconciler[T]) GetCtrlClient() ctrlclient.Client {
return b.Client.GetCtrlClient()
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/reconciler/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ import (

"github.com/zncdatadev/superset-operator/pkg/builder"
"github.com/zncdatadev/superset-operator/pkg/client"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var (
resourceLogger = ctrl.Log.WithName("reconciler").WithName("resource")
)

type ResourceReconciler[B builder.Builder] interface {
Reconciler
GetObjectMeta() metav1.ObjectMeta
GetBuilder() B
ResourceReconcile(ctx context.Context, resource ctrlclient.Object) Result
}
Expand All @@ -38,19 +44,21 @@ func NewGenericResourceReconciler[B builder.Builder](
}
}

func (r *GenericResourceReconciler[b]) GetObjectMeta() metav1.ObjectMeta {
return r.Builder.GetObjectMeta()
}

func (r *GenericResourceReconciler[B]) GetBuilder() B {
return r.Builder
}

func (r *GenericResourceReconciler[B]) ResourceReconcile(ctx context.Context, resource ctrlclient.Object) Result {

if err := ctrl.SetControllerReference(r.Client.OwnerReference, resource, r.GetCtrlScheme()); err != nil {
return NewResult(true, 0, err)
}

if mutation, err := r.Client.CreateOrUpdate(ctx, resource); err != nil {
resourceLogger.Error(err, "Failed to create or update resource", "name", resource.GetName(), "namespace", resource.GetNamespace(), "cluster", r.Options.GetClusterName())
return NewResult(true, 0, err)
} else if mutation {
resourceLogger.Info("Resource created or updated", "name", resource.GetName(), "namespace", resource.GetNamespace(), "cluster", r.Options.GetClusterName())
return NewResult(true, time.Second, nil)
}
return NewResult(false, 0, nil)
Expand Down
Loading

0 comments on commit 2eb6bd4

Please sign in to comment.