diff --git a/internal/controller/atlasmigration_controller.go b/internal/controller/atlasmigration_controller.go index 3f9b5ac..0f3106c 100644 --- a/internal/controller/atlasmigration_controller.go +++ b/internal/controller/atlasmigration_controller.go @@ -134,7 +134,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque } res.SetNotReady(reason, err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. @@ -156,7 +156,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) if err != nil { res.SetNotReady("GettingDevDB", err.Error()) - return result(err) + return xresult(err) } } // Reconcile given resource @@ -164,7 +164,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque if err != nil { r.recordErrEvent(res, err) } - return result(err) + return xresult(err) } func (r *AtlasMigrationReconciler) readDirState(ctx context.Context, obj client.Object) (migrate.Dir, error) { diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index faa61f8..f3a5044 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -30,6 +30,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -43,6 +44,8 @@ import ( "github.com/ariga/atlas-operator/api/v1alpha1" dbv1alpha1 "github.com/ariga/atlas-operator/api/v1alpha1" "github.com/ariga/atlas-operator/internal/controller/watch" + "github.com/ariga/atlas-operator/internal/result" + "github.com/ariga/atlas-operator/internal/status" "github.com/hashicorp/hcl/v2/hclwrite" "github.com/zclconf/go-cty/cty" ) @@ -108,7 +111,10 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) res = &dbv1alpha1.AtlasSchema{} ) if err = r.Get(ctx, req.NamespacedName, res); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) + if apierrors.IsNotFound(err) { + return result.OK() + } + return result.Failed() } defer func() { // At the end of reconcile, update the status of the resource base on the error @@ -127,31 +133,29 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) }() // When the resource is first created, create the "Ready" condition. if len(res.Status.Conditions) == 0 { - res.SetNotReady("Reconciling", "Reconciling") - return ctrl.Result{Requeue: true}, nil + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withReconciling("Reconciling")) } data, err := r.extractData(ctx, res) if err != nil { - res.SetNotReady("ReadSchema", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("ReadSchema", err)) } if data.hasTargets() { - res.SetNotReady("ReadSchema", "Multiple targets are not supported") - return ctrl.Result{}, nil + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("ReadSchema", errors.New("multiple targets are not supported"))) } hash, err := data.hash() if err != nil { - res.SetNotReady("CalculatingHash", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("CalculatingHash", err)) } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. // This is to ensure that other tools know we are still applying the changes. if res.IsReady() && res.IsHashModified(hash) { - res.SetNotReady("Reconciling", "current schema does not match last applied") - return ctrl.Result{Requeue: true}, nil + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withReconciling("current schema does not match last applied")) } // ==================================================== // Starting area to handle the heavy jobs. @@ -162,8 +166,8 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) // spin up a dev-db and get the connection string. data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) if err != nil { - res.SetNotReady("GettingDevDB", err.Error()) - return result(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("GettingDevDB", err)) } } opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)} @@ -188,40 +192,34 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) return err } if err != nil { - res.SetNotReady("CreatingWorkingDir", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("CreatingWorkingDir", err)) } defer wd.Close() cli, err := r.atlasClient(wd.Path(), data.Cloud) if err != nil { - res.SetNotReady("CreatingAtlasClient", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("CreatingAtlasClient", err)) } var whoami *atlasexec.WhoAmI switch whoami, err = cli.WhoAmI(ctx); { case errors.Is(err, atlasexec.ErrRequireLogin): log.Info("the resource is not connected to Atlas Cloud") if data.Config != nil { - err = errors.New("login is required to use custom atlas.hcl config") - res.SetNotReady("WhoAmI", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("WhoAmI", errors.New("login is required to use custom atlas.hcl config"))) } case err != nil: - res.SetNotReady("WhoAmI", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("WhoAmI", err)) default: log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org) } var reports []*atlasexec.SchemaApply shouldLint, err := data.shouldLint() if err != nil { - res.SetNotReady("LintPolicyError", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("LintPolicyError", err)) } switch desiredURL := data.Desired.String(); { // The resource is connected to Atlas Cloud. @@ -231,7 +229,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) m.setLintReview(dbv1alpha1.LintReviewError, false) }) if err != nil { - return result(err) + return xresult(err) } params := &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -268,7 +266,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{ Env: data.EnvName, @@ -285,7 +283,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } desiredURL = state.URL } @@ -315,7 +313,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) default: log.Info("created a new schema plan", "plan", plan.File.URL, "desiredURL", desiredURL) res.Status.PlanURL = plan.File.URL @@ -342,7 +340,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) // There are multiple pending plans. This is an unexpected state. case len(plans) > 1: planURLs := make([]string, 0, len(plans)) @@ -355,7 +353,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) err = errors.New(msg) r.recordErrEvent(res, err) - return result(err) + return xresult(err) // There are no pending plans, but Atlas has been asked to review the changes ALWAYS. case len(plans) == 0 && data.Policy.Lint.Review == dbv1alpha1.LintReviewAlways: // Create a plan for the pending changes. @@ -390,7 +388,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err = editAtlasHCL(func(m *managedData) { m.enableDestructive(true) }); err != nil { - return result(err) + return xresult(err) } err = r.lint(ctx, wd, data, data.Vars) switch d := (*destructiveErr)(nil); { @@ -408,13 +406,13 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } // Revert the destructive linting policy back to the original value. if err = editAtlasHCL(func(m *managedData) { m.Policy.Lint.Destructive.Error = false }); err != nil { - return result(err) + return xresult(err) } reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -433,7 +431,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -459,14 +457,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } s := dbv1alpha1.AtlasSchemaStatus{ LastApplied: time.Now().Unix(), ObservedHash: hash, } if len(reports) != 1 { - return result(fmt.Errorf("unexpected number of schema reports: %d", len(reports))) + return xresult(fmt.Errorf("unexpected number of schema reports: %d", len(reports))) } log.Info("schema changes are applied", "applied", len(reports[0].Changes.Applied)) // Truncate the applied and pending changes to 1024 bytes. diff --git a/internal/controller/atlasschema_status.go b/internal/controller/atlasschema_status.go new file mode 100644 index 0000000..46df9dd --- /dev/null +++ b/internal/controller/atlasschema_status.go @@ -0,0 +1,165 @@ +// Copyright 2025 The Atlas Operator Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "encoding/json" + "fmt" + + "ariga.io/atlas-go-sdk/atlasexec" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/ariga/atlas-operator/api/v1alpha1" + "github.com/ariga/atlas-operator/internal/result" + "github.com/ariga/atlas-operator/internal/status" +) + +type optionBuilder struct { + opts []status.Option[*v1alpha1.AtlasSchema] +} + +// statusOptions returns an initialized optionBuilder +func statusOptions() *optionBuilder { + return &optionBuilder{} +} + +// GetOptions implements the OptionBuilder interface +func (o *optionBuilder) GetOptions() []status.Option[*v1alpha1.AtlasSchema] { + return o.opts +} + +func (o *optionBuilder) withCondition(condition metav1.Condition) *optionBuilder { + o.opts = append(o.opts, conditionOption{cond: condition}) + return o +} + +func (o *optionBuilder) withObservedHash(h string) *optionBuilder { + o.opts = append(o.opts, observedHashOption{hash: h}) + return o +} + +func (o *optionBuilder) withPlanFile(f *atlasexec.SchemaPlanFile) *optionBuilder { + o.opts = append(o.opts, planFileOption{file: f}) + return o +} + +func (o *optionBuilder) withReport(p *atlasexec.SchemaApply) *optionBuilder { + // Truncate the applied and pending changes to 1024 bytes. + p.Changes.Applied = truncateSQL(p.Changes.Applied, sqlLimitSize) + p.Changes.Pending = truncateSQL(p.Changes.Pending, sqlLimitSize) + o.opts = append(o.opts, reportOption{report: p}) + if plan := p.Plan; plan != nil { + return o.withPlanFile(plan.File) + } + return o +} + +type observedHashOption struct { + hash string +} + +func (o observedHashOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + obj.Status.ObservedHash = o.hash +} + +func (o observedHashOption) GetResult() (reconcile.Result, error) { + return result.OK() +} + +type reportOption struct { + report *atlasexec.SchemaApply +} + +func (m reportOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + var msg string + if j, err := json.Marshal(m.report); err != nil { + msg = fmt.Sprintf("Error marshalling apply response: %v", err) + } else { + msg = fmt.Sprintf("The schema has been applied successfully. Apply response: %s", j) + } + obj.Status.LastApplied = m.report.End.Unix() + meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionTrue, + Reason: "Applied", + Message: msg, + }) +} + +func (m reportOption) GetResult() (reconcile.Result, error) { + return result.OK() +} + +func (o *optionBuilder) withReconciling(message string) *optionBuilder { + o.opts = append(o.opts, conditionOption{ + cond: metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionFalse, + Reason: "Reconciling", + Message: message, + }, + err: &result.TransientError{}, + }) + return o +} + +func (o *optionBuilder) withNotReady(reason string, err error) *optionBuilder { + err = IgnoreNonTransient(err) + o.opts = append(o.opts, conditionOption{ + cond: metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionFalse, + Reason: reason, + Message: err.Error(), + }, + err: err, + }) + return o +} + +type planFileOption struct { + file *atlasexec.SchemaPlanFile +} + +func (m planFileOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + obj.Status.PlanURL = m.file.URL + obj.Status.PlanLink = m.file.Link +} + +func (m planFileOption) GetResult() (reconcile.Result, error) { + return result.OK() +} + +type conditionOption struct { + cond metav1.Condition + err error +} + +func (m conditionOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + meta.SetStatusCondition(&obj.Status.Conditions, m.cond) +} + +func (m conditionOption) GetResult() (reconcile.Result, error) { + return result.Transient(m.err) +} + +func IgnoreNonTransient(err error) error { + if err == nil || result.IsTransient(err) { + return nil + } + return err +} diff --git a/internal/controller/common.go b/internal/controller/common.go index 4c1ba89..edb9f17 100644 --- a/internal/controller/common.go +++ b/internal/controller/common.go @@ -185,7 +185,7 @@ func isTransient(err error) bool { // Permanent errors are not returned as errors because they cause // the controller to requeue indefinitely. Instead, they should be // reported as a status condition. -func result(err error) (r ctrl.Result, _ error) { +func xresult(err error) (r ctrl.Result, _ error) { if t := (*transientError)(nil); errors.As(err, &t) { r.RequeueAfter = t.after } diff --git a/internal/result/errors.go b/internal/result/errors.go new file mode 100644 index 0000000..c7c4000 --- /dev/null +++ b/internal/result/errors.go @@ -0,0 +1,36 @@ +package result + +import ( + "errors" +) + +// TransientError is an error that should be retried. +type TransientError struct { + Err error + After int +} + +// Error implements the error interface +func (t *TransientError) Error() string { + return t.Err.Error() +} + +// Unwrap implements the errors.Wrapper interface +func (t *TransientError) Unwrap() error { + return t.Err +} + +// TransientErrorAfter wraps an error to indicate that it should be retried after +// the given duration. +func TransientErrorAfter(err error, after int) error { + if err == nil { + return nil + } + return &TransientError{Err: err, After: after} +} + +// IsTransient checks if the error is transient +func IsTransient(err error) bool { + var t *TransientError + return errors.As(err, &t) +} diff --git a/internal/result/result.go b/internal/result/result.go new file mode 100644 index 0000000..4136d73 --- /dev/null +++ b/internal/result/result.go @@ -0,0 +1,52 @@ +// Copyright 2025 The Atlas Operator Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package result + +import ( + "errors" + "time" + + ctrl "sigs.k8s.io/controller-runtime" +) + +// OK returns a successful result +func OK() (ctrl.Result, error) { + return ctrl.Result{}, nil +} + +// Failed returns a failed result +func Failed() (ctrl.Result, error) { + return Retry(0) +} + +// Retry requeues the request after the specified number of seconds +func Retry(after int) (ctrl.Result, error) { + return ctrl.Result{ + Requeue: true, + RequeueAfter: time.Second * time.Duration(after), + }, nil +} + +// Transient checks if the error is transient and returns a result +// that indicates whether the request should be retried. +func Transient(err error) (ctrl.Result, error) { + if t := (*TransientError)(nil); errors.As(err, &t) { + return Retry(t.After) + } + // Permanent errors are not returned as errors because they cause + // the controller to requeue indefinitely. Instead, they should be + // reported as a status condition. + return OK() +} diff --git a/internal/status/status.go b/internal/status/status.go new file mode 100644 index 0000000..e6d4364 --- /dev/null +++ b/internal/status/status.go @@ -0,0 +1,58 @@ +// Copyright 2025 The Atlas Operator Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package status + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ( + // OptionBuilder is an interface that can be implemented + // by any type that can provide a list of options + OptionBuilder[T any] interface { + GetOptions() []Option[T] + } + // Option is an interface that can be implemented by any type + // that can apply an option to a resource and return a result + Option[T any] interface { + ApplyOption(o T) + GetResult() (ctrl.Result, error) + } +) + +// Update takes the options provided by the given option builder, applies them all and then updates the resource +func Update[T client.Object](ctx context.Context, sw client.StatusWriter, obj T, b OptionBuilder[T]) (r ctrl.Result, err error) { + opts := b.GetOptions() + for _, o := range opts { + o.ApplyOption(obj) + } + if err := sw.Update(ctx, obj); err != nil { + return ctrl.Result{}, err + } + for _, o := range opts { + if r, err = o.GetResult(); err != nil { + return r, err + } + } + for _, o := range opts { + if r, _ := o.GetResult(); r.Requeue || r.RequeueAfter > 0 { + return r, nil + } + } + return ctrl.Result{}, nil +}