Skip to content

Commit

Permalink
internal/controller: refactor error handling in versioned flow (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
datdao authored Jan 23, 2025
1 parent b4d554b commit fab7c5b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 57 deletions.
99 changes: 50 additions & 49 deletions internal/controller/atlasmigration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
data, err := r.extractData(ctx, res)
if err != nil {
var reason = "ReadingMigrationData"
if e, ok := err.(interface{ Reason() string }); ok {
reason = e.Reason()
}
res.SetNotReady(reason, err.Error())
r.recordErrEvent(res, err)
return result(err)
return r.resultErr(res, err, "ReadingMigrationData")
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
Expand All @@ -155,16 +149,11 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque
// spin up a dev-db and get the connection string.
data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL)
if err != nil {
res.SetNotReady("GettingDevDB", err.Error())
return result(err)
return r.resultErr(res, err, "GettingDevDB")
}
}
// Reconcile given resource
err = r.reconcile(ctx, data, res)
if err != nil {
r.recordErrEvent(res, err)
}
return result(err)
return r.reconcile(ctx, data, res)
}

func (r *AtlasMigrationReconciler) readDirState(ctx context.Context, obj client.Object) (migrate.Dir, error) {
Expand Down Expand Up @@ -256,7 +245,7 @@ const (
)

// Reconcile the given AtlasMigration resource.
func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migrationData, res *dbv1alpha1.AtlasMigration) error {
func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migrationData, res *dbv1alpha1.AtlasMigration) (ctrl.Result, error) {
log := ctrl.Log.WithName("atlas_migration.reconcile")
// Create a working directory for the Atlas CLI
// The working directory contains the atlas.hcl config
Expand All @@ -266,49 +255,43 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
atlasexec.WithMigrations(data.Dir),
)
if err != nil {
res.SetNotReady("ReadingMigrationData", err.Error())
return err
return r.resultErr(res, err, "ReadingMigrationData")
}
defer wd.Close()
c, err := r.atlasClient(wd.Path(), data.Cloud)
if err != nil {
return err
return r.resultErr(res, err, "CreatingAtlasClient")
}
var whoami *atlasexec.WhoAmI
switch whoami, err = c.WhoAmI(ctx); {
case errors.Is(err, atlasexec.ErrRequireLogin):
log.Info("the resource is not connected to Atlas Cloud")
if data.Config != nil {
err = errors.New("login is required to use custom atlas.hcl config")
res.SetNotReady("WhoAmI", err.Error())
r.recordErrEvent(res, err)
return err
return r.resultErr(res, err, "WhoAmI")
}
case err != nil:
res.SetNotReady("WhoAmI", err.Error())
r.recordErrEvent(res, err)
return err
return r.resultErr(res, err, "WhoAmI")
default:
log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org)
}
log.Info("reconciling migration", "env", data.EnvName)
// Check if there are any pending migration files
status, err := c.MigrateStatus(ctx, &atlasexec.MigrateStatusParams{Env: data.EnvName, Vars: data.Vars})
if err != nil {
res.SetNotReady("Migrating", err.Error())
if isChecksumErr(err) {
return err
return r.resultErr(res, err, "Migrating")
}
return transient(err)
return r.resultCLIErr(res, transient(err), "Migrating")
}
switch {
case len(status.Pending) == 0 && len(status.Applied) > 0 && len(status.Available) < len(status.Applied):
if !data.MigrateDown {
res.SetNotReady("ProtectedFlowError", "Migrate down is not allowed")
return &ProtectedFlowError{
err = &ProtectedFlowError{
reason: "ProtectedFlowError",
msg: "migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade",
}
return r.resultErr(res, err, "ProtectedFlowError")
}
// The downgrade is allowed, apply the last migration version
last := status.Available[len(status.Available)-1]
Expand All @@ -334,33 +317,28 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
// then use it to downgrade.
current := fmt.Sprintf("migrations-%s", status.Current)
if err = wd.CopyFS(current, data.DirLatest); err != nil {
return err
return r.resultErr(res, err, "CopyingDirState")
}
params.DirURL = fmt.Sprintf("file://%s", current)
default:
return fmt.Errorf("unable to downgrade, no dir-state found")
return r.resultErr(res, errors.New("unable to downgrade, no dir-state found"), "Migrating")
}
run, err := c.MigrateDown(ctx, params)
if err != nil {
res.SetNotReady("Migrating", err.Error())
if !isSQLErr(err) {
err = transient(err)
}
return err
return r.resultCLIErr(res, err, "Migrating")
}
switch run.Status {
case StatePending:
res.SetNotReady("ApprovalPending", "Deployment is waiting for approval")
res.Status.ApprovalURL = run.URL
return transient(&ProtectedFlowError{
err = transient(&ProtectedFlowError{
reason: "ApprovalPending",
msg: fmt.Sprintf("plan approval pending, review here: %s", run.URL),
})
return r.resultErr(res, err, "ApprovalPending")
case StateAborted:
res.SetNotReady("PlanRejected", "Deployment is aborted")
res.Status.ApprovalURL = run.URL
// Migration is aborted, no need to reapply
return fmt.Errorf("plan rejected, review here: %s", run.URL)
return r.resultErr(res, fmt.Errorf("plan rejected, review here: %s", run.URL), "PlanRejected")
case StateApplied, StateApproved:
res.SetReady(dbv1alpha1.AtlasMigrationStatus{
ObservedHash: data.ObservedHash,
Expand Down Expand Up @@ -397,14 +375,10 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
Vars: data.Vars,
})
if err != nil {
res.SetNotReady("Migrating", err.Error())
if !isSQLErr(err) {
err = transient(err)
}
return err
return r.resultCLIErr(res, err, "Migrating")
}
if len(reports) != 1 {
return fmt.Errorf("unexpected number of reports: %d", len(reports))
return r.resultErr(res, fmt.Errorf("unexpected number of reports: %d", len(reports)), "Migrating")
}
res.SetReady(dbv1alpha1.AtlasMigrationStatus{
ObservedHash: data.ObservedHash,
Expand All @@ -417,11 +391,10 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
// Compress the migration directory then store it in the secret
// for later use when atlas runs the migration down.
if err = r.storeDirState(ctx, res, data.Dir); err != nil {
res.SetNotReady("StoringDirState", err.Error())
return err
return r.resultErr(res, err, "StoringDirState")
}
}
return nil
return ctrl.Result{}, nil
}

type ProtectedFlowError struct {
Expand Down Expand Up @@ -569,6 +542,34 @@ func (r *AtlasMigrationReconciler) recordErrEvent(res *dbv1alpha1.AtlasMigration
r.recorder.Event(res, corev1.EventTypeWarning, reason, strings.TrimSpace(err.Error()))
}

func (r *AtlasMigrationReconciler) resultErr(
res *dbv1alpha1.AtlasMigration, err error, reason string,
) (ctrl.Result, error) {
if e, ok := err.(interface{ Reason() string }); ok {
reason = e.Reason()
}
if isConnectionErr(err) {
err = transient(err)
}
res.SetNotReady(reason, err.Error())
r.recordErrEvent(res, err)
return result(err)
}

func (r *AtlasMigrationReconciler) resultCLIErr(
res *dbv1alpha1.AtlasMigration, err error, reason string,
) (ctrl.Result, error) {
if e, ok := err.(interface{ Reason() string }); ok {
reason = e.Reason()
}
if !isSQLErr(err) {
err = transient(err)
}
res.SetNotReady(reason, err.Error())
r.recordErrEvent(res, err)
return result(err)
}

// Calculate the hash of the given data
func hashMigrationData(d *migrationData) (string, error) {
h := sha256.New()
Expand Down
14 changes: 7 additions & 7 deletions internal/controller/atlasmigration_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestMigration_MigrateDown_Remote_Protected(t *testing.T) {
})
}
// No changes because the migration down is not allowed
assert(ctrl.Result{}, false, "ProtectedFlowError", "Migrate down is not allowed", "", "", "")
assert(ctrl.Result{}, false, "ProtectedFlowError", "migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade", "", "", "")

obj = &dbv1alpha1.AtlasMigration{
ObjectMeta: meta,
Expand Down Expand Up @@ -362,8 +362,8 @@ func TestMigration_MigrateDown_Remote_Protected(t *testing.T) {
URL: "THIS_IS_DEPLOYMENT_URL",
}
// Reconcile again
assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "Deployment is waiting for approval", "", "THIS_IS_DEPLOYMENT_URL", "")
assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "Deployment is waiting for approval", "", "THIS_IS_DEPLOYMENT_URL", "")
assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "plan approval pending, review here: THIS_IS_DEPLOYMENT_URL", "", "THIS_IS_DEPLOYMENT_URL", "")
assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "plan approval pending, review here: THIS_IS_DEPLOYMENT_URL", "", "THIS_IS_DEPLOYMENT_URL", "")

mockExec.down.res = &atlasexec.MigrateDown{
Current: "2",
Expand Down Expand Up @@ -462,7 +462,7 @@ func TestMigration_MigrateDown_Local(t *testing.T) {
URL: "",
}
// No changes because the migration down is not allowed
assert(ctrl.Result{}, false, "ProtectedFlowError", "Migrate down is not allowed", "", "", "")
assert(ctrl.Result{}, false, "ProtectedFlowError", "migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade", "", "", "")

h.patch(t, &dbv1alpha1.AtlasMigration{
ObjectMeta: meta,
Expand Down Expand Up @@ -687,7 +687,7 @@ func TestReconcile_reconcile(t *testing.T) {
}
md, err := tt.r.extractData(context.Background(), res)
require.NoError(t, err)
err = tt.r.reconcile(context.Background(), md, res)
_, err = tt.r.reconcile(context.Background(), md, res)
require.NoError(t, err)
require.EqualValues(t, "20230412003626", res.Status.LastAppliedVersion)
}
Expand Down Expand Up @@ -746,7 +746,7 @@ func TestReconcile_reconcile_upToDate(t *testing.T) {
},
})
require.NoError(t, err)
err = tt.r.reconcile(context.Background(), md, res)
_, err = tt.r.reconcile(context.Background(), md, res)
require.NoError(t, err)
require.EqualValues(t, "20230412003626", res.Status.LastAppliedVersion)
}
Expand All @@ -770,7 +770,7 @@ func TestReconcile_reconcile_baseline(t *testing.T) {
}
md, err := tt.r.extractData(context.Background(), res)
require.NoError(t, err)
err = tt.r.reconcile(context.Background(), md, res)
_, err = tt.r.reconcile(context.Background(), md, res)
require.NoError(t, err)
require.EqualValues(t, "20230412003628", res.Status.LastAppliedVersion)

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/testscript/migration-mysql.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ kubectl create configmap migration-dir --from-file=migrations-v1 --dry-run=clien
stdin stdout
kubectl apply -f -
# Expect migration is failured
kubectl wait --timeout=500s --for=jsonpath='{.status.conditions[*].message}'='"Migrate down is not allowed"' AtlasMigration/mysql
kubectl wait --timeout=500s --for=jsonpath='{.status.conditions[*].message}'='"migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade"' AtlasMigration/mysql
# Patch the migration to allow down migration
kubectl patch AtlasMigration/mysql --type merge --patch-file ./migration-patch-down.yaml
kubectl wait --timeout=500s --for=condition=ready AtlasMigration/mysql
Expand Down

0 comments on commit fab7c5b

Please sign in to comment.