Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: test reconcile logic #24

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

supersetv1alpha1 "github.com/zncdatadev/superset-operator/api/v1alpha1"
"github.com/zncdatadev/superset-operator/internal/controller"
Expand Down Expand Up @@ -67,7 +67,7 @@ func main() {

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: server.Options{BindAddress: metricsAddr},
Metrics: metricsserver.Options{BindAddress: metricsAddr},
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "e19e02e9.zncdata.dev",
Expand Down
1 change: 0 additions & 1 deletion internal/controller/cluster/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (b *JobBuilder) mainContainer() *corev1.Container {
containerBuilder.AddVolumeMount(volumeMount)
// SetCommand([]string{"/bin/sh", "-c", ". /app/pythonpath/superset_bootstrap.sh; . /app/pythonpath/superset_init.sh"})
containerBuilder.SetCommand([]string{"tail", "-f"})
containerBuilder.AddEnvFromSecret(b.ClusterConfig.ConfigSecretName)
containerBuilder.AddEnvFromSecret(b.ClusterConfig.EnvSecretName)

existAdminSecretName := b.ClusterConfig.Spec.Administrator.ExistSecret
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/common/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (b *DeploymentBuilder) GetMainContainer() builder.ContainerBuilder {
)
containerBuilder.AddEnvFromSecret(b.ClusterConfig.EnvSecretName)
containerBuilder.SetCommand([]string{"/bin/sh", "-c", ". /app/pythonpath/superset_bootstrap.sh; /usr/bin/run-server.sh"})
containerBuilder.AddVolumeMount(corev1.VolumeMount{Name: "superset-config", MountPath: "/app/superset", ReadOnly: true})
containerBuilder.AddVolumeMount(corev1.VolumeMount{Name: "superset-config", MountPath: "/app/pythonpath", ReadOnly: true})
containerBuilder.AddPorts(b.Options.GetPorts())
containerBuilder.SetProbeWithHealth()
return containerBuilder
Expand All @@ -47,7 +47,7 @@ func (b *DeploymentBuilder) GetMainContainer() builder.ContainerBuilder {
func (b *DeploymentBuilder) GetInitContainer() builder.ContainerBuilder {
containerBuilder := builder.NewGenericContainerBuilder(
"wait-for-postgres-redis",
b.Options.GetImage().String(),
"apache/superset:dockerize",
b.Options.GetImage().PullPolicy,
)
containerBuilder.SetCommand([]string{"/bin/sh", "-c", "dockerize -wait \"tcp://$DB_HOST:$DB_PORT\" -wait \"tcp://$REDIS_HOST:$REDIS_PORT\" -timeout 120s"})
Expand All @@ -60,7 +60,7 @@ func (b *DeploymentBuilder) GetVolume() *corev1.Volume {
Name: "superset-config",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: b.ClusterConfig.EnvSecretName,
SecretName: b.ClusterConfig.ConfigSecretName,
},
},
}
Expand Down
1 change: 1 addition & 0 deletions internal/controller/node/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (r *Reconciler) RegisterResourceWithRoleGroup(
EnvOverrides: roleGroup.EnvOverrides,
//PodOverrides: roleGroup.PodOverrides, TODO: Uncomment this line
}
roleGroupOptions.SetPorts(Ports)

service := reconciler.NewServiceReconciler(
r.Client,
Expand Down
12 changes: 0 additions & 12 deletions internal/controller/worker/port.go

This file was deleted.

6 changes: 0 additions & 6 deletions internal/controller/worker/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ func (r *Reconciler) RegisterResourceWithRoleGroup(
//PodOverrides: roleGroup.PodOverrides, TODO: Uncomment this line
}

service := reconciler.NewServiceReconciler(
r.Client,
roleGroupOptions,
)
r.AddResource(service)

deployment := NewDeploymentReconciler(
r.Client,
r.ClusterConfig,
Expand Down
1 change: 1 addition & 0 deletions pkg/builder/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (b *GenericDeploymentBuilder) GetObject() *appv1.Deployment {
b.replicas = &DefaultReplicas
}
obj := &appv1.Deployment{
ObjectMeta: b.GetObjectMeta(),
Spec: appv1.DeploymentSpec{
Replicas: b.replicas,
Selector: &metav1.LabelSelector{
Expand Down
10 changes: 10 additions & 0 deletions pkg/builder/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ var (
)

type Options interface {
GetClusterName() string
GetName() string
GetNamespace() string
GetFullName() string
GetLabels() map[string]string
AddLabels(labels map[string]string)
Expand All @@ -41,10 +43,18 @@ type ClusterOptions struct {
Ports []corev1.ContainerPort
}

func (o *ClusterOptions) GetClusterName() string {
return o.Name
}

func (o *ClusterOptions) GetName() string {
return o.Name
}

func (o *ClusterOptions) GetNamespace() string {
return o.Namespace
}

func (o *ClusterOptions) GetFullName() string {
return o.Name
}
Expand Down
92 changes: 75 additions & 17 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ import (
"github.com/cisco-open/k8s-objectmatcher/patch"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var (
clientLogger = ctrl.Log.WithName("resourceClient")
clientLogger = ctrl.Log.WithName("client")
)

type Client struct {
Expand Down Expand Up @@ -70,30 +71,68 @@ func (c *Client) Get(ctx context.Context, obj ctrlclient.Object) error {
return nil
}

func (c *Client) SetOwnerReference(obj ctrlclient.Object, gvk *schema.GroupVersionKind) error {

if obj.GetNamespace() == "" {
clientLogger.V(5).Info("Skip setting owner reference for object without namespace, it maybe a cluster-scoped resource",
"gvk", gvk,
"name", obj.GetName(),
)
return nil
}
if err := ctrl.SetControllerReference(c.OwnerReference, obj, c.Client.Scheme()); err != nil {
clientLogger.Error(err, "Failed to set owner reference",
"gvk", gvk,
"namespace", obj.GetNamespace(),
"name", obj.GetName(),
)
return err
}

clientLogger.V(5).Info("Set owner reference for object",
"gvk", gvk, "namespace",
obj.GetNamespace(),
"name", obj.GetName(),
"owner", c.OwnerReference.GetName(),
)

return nil
}

func (c *Client) CreateOrUpdate(ctx context.Context, obj ctrlclient.Object) (mutation bool, err error) {

key := ctrlclient.ObjectKeyFromObject(obj)
objectKey := ctrlclient.ObjectKeyFromObject(obj)
namespace := obj.GetNamespace()
kinds, _, _ := c.Client.Scheme().ObjectKinds(obj)

gvk, err := GetObjectGVK(obj)
if err != nil {
return false, err
}

name := obj.GetName()

clientLogger.V(5).Info("Creating or updating object", "Kind", kinds, "Namespace", namespace, "Name", name)
if err := c.SetOwnerReference(obj, gvk); err != nil {
return false, err
}

clientLogger.V(5).Info("Creating or updating object", "gvk", gvk, "namespace", namespace, "name", name)

current := obj.DeepCopyObject().(ctrlclient.Object)
// Check if the object exists, if not create a new one
err = c.Client.Get(ctx, key, current)
err = c.Client.Get(ctx, objectKey, current)
var calculateOpt = []patch.CalculateOption{
patch.IgnoreStatusFields(),
}
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
clientLogger.V(1).Info("Resource not found, creating a new.", "gvk", gvk, "namespace", namespace, "name", name)
if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(obj); err != nil {
return false, err
}
clientLogger.Info("Creating a new object", "Kind", kinds, "Namespace", namespace, "Name", name)

if err := c.Client.Create(ctx, obj); err != nil {
clientLogger.Error(err, "Failed to create resource", "gvk", gvk, "namespace", namespace, "name", name)
return false, err
}
clientLogger.V(5).Info("Resource created", "gvk", gvk, "namespace", namespace, "name", name)
return true, nil
} else if err == nil {
switch obj.(type) {
Expand All @@ -117,39 +156,58 @@ func (c *Client) CreateOrUpdate(ctx context.Context, obj ctrlclient.Object) (mut
case *appsv1.StatefulSet:
calculateOpt = append(calculateOpt, patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus())
}

result, err := patch.DefaultPatchMaker.Calculate(current, obj, calculateOpt...)
if err != nil {
clientLogger.Error(err, "failed to calculate patch to match objects, moving on to update")
clientLogger.Error(err, "Failed to calculate patch to match objects, moving to update", "gvk", gvk, "namespace", namespace, "name", name)
// if there is an error with matching, we still want to update
resourceVersion := current.(metav1.ObjectMetaAccessor).GetObjectMeta().GetResourceVersion()
obj.(metav1.ObjectMetaAccessor).GetObjectMeta().SetResourceVersion(resourceVersion)

if err := c.Client.Update(ctx, obj); err != nil {
clientLogger.Error(err, "Failed to update resource", "gvk", gvk, "namespace", namespace, "name", name)
return false, err
}
clientLogger.V(5).Info("Resource updated", "gvk", gvk, "namespace", namespace, "name", name)
return true, nil
}

if !result.IsEmpty() {
clientLogger.Info(
fmt.Sprintf("Resource update for object %s:%s", kinds, obj.(metav1.ObjectMetaAccessor).GetObjectMeta().GetName()),
"patch", string(result.Patch),
)
clientLogger.V(1).Info("Resource modified, updating", "gvk", gvk, "namespace", namespace, "name", name)
// ignore the update if the object is a secret
if _, ok := obj.(*corev1.Secret); !ok {
clientLogger.V(1).Info("Patch result", "gvk", gvk, "namespace", namespace, "name", name, "patch", string(result.Patch))
}

if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(obj); err != nil {
clientLogger.Error(err, "failed to annotate modified object", "object", obj)
clientLogger.Error(err, "Failed to update object annotation, moving to update", "gvk", gvk, "namespace", namespace, "name", name)
}

resourceVersion := current.(metav1.ObjectMetaAccessor).GetObjectMeta().GetResourceVersion()
obj.(metav1.ObjectMetaAccessor).GetObjectMeta().SetResourceVersion(resourceVersion)

if err = c.Client.Update(ctx, obj); err != nil {
clientLogger.Error(err, "Failed to update resource", "gvk", gvk, "namespace", namespace, "name", name)
return false, err
}
return true, nil
}
clientLogger.V(1).Info(fmt.Sprintf("Skipping update for object %s:%s", kinds, obj.(metav1.ObjectMetaAccessor).GetObjectMeta().GetName()))

clientLogger.V(1).Info("Skipping update for object", "gvk", gvk, "namespace", namespace, "name", name)
}
return false, err
}

func GetObjectGVK(obj ctrlclient.Object) (*schema.GroupVersionKind, error) {
gvks, _, err := scheme.Scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}

if len(gvks) == 0 {
return nil, fmt.Errorf("no GroupVersionKind found for object %T", obj)
}

gvk := gvks[0]

return &gvk, nil
}
2 changes: 1 addition & 1 deletion pkg/reconciler/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

var (
logger = ctrl.Log.WithName("common").WithName("reconciler")
logger = ctrl.Log.WithName("reconciler")
)

type ClusterReconciler interface {
Expand Down
11 changes: 8 additions & 3 deletions pkg/reconciler/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@ func (r *DeploymentReconciler) Reconcile(ctx context.Context) Result {
}

func (r *DeploymentReconciler) Ready(ctx context.Context) Result {
obj := appv1.Deployment{}

obj := appv1.Deployment{
ObjectMeta: r.GetObjectMeta(),
}
logger.V(1).Info("Checking deployment ready", "namespace", obj.Namespace, "name", obj.Name)
if err := r.Client.Get(ctx, &obj); err != nil {
return NewResult(true, 0, err)
}
if obj.Status.ReadyReplicas == *obj.Spec.Replicas {
logger.V(1).Info("Deployment is ready", "namespace", obj.Namespace, "name", obj.Name)
logger.Info("Deployment is ready", "namespace", obj.Namespace, "name", obj.Name, "replicas", *obj.Spec.Replicas, "readyReplicas", obj.Status.ReadyReplicas)
return NewResult(false, 0, nil)
}
logger.V(1).Info("Deployment is not ready", "namespace", obj.Namespace, "name", obj.Name)
logger.Info("Deployment is not ready", "namespace", obj.Namespace, "name", obj.Name, "replicas", *obj.Spec.Replicas, "readyReplicas", obj.Status.ReadyReplicas)
return NewResult(false, 5, nil)
}

Expand All @@ -65,5 +69,6 @@ func NewDeploymentReconciler(
options,
deployBuilder,
),
Options: options,
}
}
6 changes: 6 additions & 0 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
type AnySpec any

type Reconciler interface {
GetName() string
GetNamespace() string
GetClient() *client.Client
GetCtrlClient() ctrlclient.Client
GetCtrlScheme() *runtime.Scheme
Expand All @@ -36,6 +38,10 @@ func (b *BaseReconciler[T]) GetName() string {
return b.Options.GetFullName()
}

func (b *BaseReconciler[T]) GetNamespace() string {
return b.Options.GetNamespace()
}

func (b *BaseReconciler[T]) GetCtrlClient() ctrlclient.Client {
return b.Client.GetCtrlClient()
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/reconciler/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ import (

"github.com/zncdatadev/superset-operator/pkg/builder"
"github.com/zncdatadev/superset-operator/pkg/client"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var (
resourceLogger = ctrl.Log.WithName("reconciler").WithName("resource")
)

type ResourceReconciler[B builder.Builder] interface {
Reconciler
GetObjectMeta() metav1.ObjectMeta
GetBuilder() B
ResourceReconcile(ctx context.Context, resource ctrlclient.Object) Result
}
Expand All @@ -38,19 +44,21 @@ func NewGenericResourceReconciler[B builder.Builder](
}
}

func (r *GenericResourceReconciler[b]) GetObjectMeta() metav1.ObjectMeta {
return r.Builder.GetObjectMeta()
}

func (r *GenericResourceReconciler[B]) GetBuilder() B {
return r.Builder
}

func (r *GenericResourceReconciler[B]) ResourceReconcile(ctx context.Context, resource ctrlclient.Object) Result {

if err := ctrl.SetControllerReference(r.Client.OwnerReference, resource, r.GetCtrlScheme()); err != nil {
return NewResult(true, 0, err)
}

if mutation, err := r.Client.CreateOrUpdate(ctx, resource); err != nil {
resourceLogger.Error(err, "Failed to create or update resource", "name", resource.GetName(), "namespace", resource.GetNamespace(), "cluster", r.Options.GetClusterName())
return NewResult(true, 0, err)
} else if mutation {
resourceLogger.Info("Resource created or updated", "name", resource.GetName(), "namespace", resource.GetNamespace(), "cluster", r.Options.GetClusterName())
return NewResult(true, time.Second, nil)
}
return NewResult(false, 0, nil)
Expand Down
Loading
Loading