diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index 420886a..301bfa6 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -17,8 +17,6 @@ package controller import ( "bytes" "context" - "crypto/sha256" - "encoding/hex" "errors" "fmt" "io" @@ -140,23 +138,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) res.SetNotReady("ReadSchema", "Multiple targets are not supported") return ctrl.Result{}, nil } - hash, err := data.hash() - if err != nil { - res.SetNotReady("CalculatingHash", err.Error()) - r.recordErrEvent(res, err) - return result(err) - } - // We need to update the ready condition immediately before doing - // any heavy jobs if the hash is different from the last applied. - // This is to ensure that other tools know we are still applying the changes. - if res.IsReady() && res.IsHashModified(hash) { - res.SetNotReady("Reconciling", "current schema does not match last applied") - return ctrl.Result{Requeue: true}, nil + opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)} + if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile { + // Write the schema file to the working directory. + opts = append(opts, func(ce *atlasexec.WorkingDir) error { + _, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema) + return err + }) } - // ==================================================== - // Starting area to handle the heavy jobs. - // Below this line is the main logic of the controller. - // ==================================================== if !data.hasDevURL() && data.URL != nil { // The user has not specified an URL for dev-db, // spin up a dev-db and get the connection string. @@ -166,17 +155,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) return result(err) } } - opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)} - if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile { - // Write the schema file to the working directory. - opts = append(opts, func(ce *atlasexec.WorkingDir) error { - _, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema) - return err - }) - } // Create a working directory for the Atlas CLI // The working directory contains the atlas.hcl config. wd, err := atlasexec.NewWorkingDir(opts...) + if err != nil { + res.SetNotReady("CreatingWorkingDir", err.Error()) + r.recordErrEvent(res, err) + return result(err) + } + defer wd.Close() // This function will be used to edit and re-render the atlas.hcl file in the working directory. editAtlasHCL := func(fn func(m *managedData)) error { fn(data) @@ -187,18 +174,38 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) _, err = wd.WriteFile("atlas.hcl", buf.Bytes()) return err } + cli, err := r.atlasClient(wd.Path(), data.Cloud) if err != nil { - res.SetNotReady("CreatingWorkingDir", err.Error()) + res.SetNotReady("CreatingAtlasClient", err.Error()) r.recordErrEvent(res, err) return result(err) } - defer wd.Close() - cli, err := r.atlasClient(wd.Path(), data.Cloud) + // Calculate the hash of the current schema. + hash, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{ + Env: data.EnvName, + URL: data.targetURL(), + Format: `{{ .Hash }}`, + Vars: data.Vars, + }) if err != nil { - res.SetNotReady("CreatingAtlasClient", err.Error()) + res.SetNotReady("CalculatingHash", err.Error()) r.recordErrEvent(res, err) + if isConnectionErr(err) { + err = transient(err) + } return result(err) } + // We need to update the ready condition immediately before doing + // any heavy jobs if the hash is different from the last applied. + // This is to ensure that other tools know we are still applying the changes. + if res.IsReady() && res.IsHashModified(hash) { + res.SetNotReady("Reconciling", "current schema does not match last applied") + return ctrl.Result{Requeue: true}, nil + } + // ==================================================== + // Starting area to handle the heavy jobs. + // Below this line is the main logic of the controller. + // ==================================================== var whoami *atlasexec.WhoAmI switch whoami, err = cli.WhoAmI(ctx); { case errors.Is(err, atlasexec.ErrRequireLogin): @@ -251,9 +258,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) // And the schema is available for the Atlas CLI (on local machine) // to modify or approve the changes. if data.Desired.Scheme == dbv1alpha1.SchemaTypeFile { - log.Info("schema is a file, pushing the schema to Atlas Cloud") - // Using hash of desired state as the tag for the schema. - // This ensures push is idempotent. tag, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{ Env: data.EnvName, URL: desiredURL, @@ -270,6 +274,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.recordErrEvent(res, err) return result(err) } + log.Info("schema is a file, pushing the schema to Atlas Cloud") state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{ Env: data.EnvName, Name: path.Join(repo.Host, repo.Path), @@ -744,17 +749,6 @@ func (d *managedData) hasLintReview() bool { return lint.Body().GetAttribute("review") != nil } -// hash returns the sha256 hash of the desired. -func (d *managedData) hash() (string, error) { - h := sha256.New() - if len(d.schema) > 0 { - h.Write([]byte(d.schema)) - } else { - h.Write([]byte(d.Desired.String())) - } - return hex.EncodeToString(h.Sum(nil)), nil -} - func (d *managedData) repoURL() *url.URL { switch { // The user has provided the repository name. @@ -773,6 +767,17 @@ func (d *managedData) repoURL() *url.URL { } } +// targetURL returns the target URL for the environment. +func (d *managedData) targetURL() string { + if d.Config != nil { + env := searchBlock(d.Config.Body(), hclwrite.NewBlock("env", []string{d.EnvName})) + if env != nil && env.Body().GetAttribute("src") != nil { + return "env://src" + } + } + return "env://schema.src" +} + // render renders the atlas.hcl template. // // The template is used by the Atlas CLI to apply the schema. @@ -857,6 +862,10 @@ func (d *managedData) asBlocks() []*hclwrite.Block { env := hclwrite.NewBlock("env", []string{d.EnvName}) blocks = append(blocks, env) envBody := env.Body() + if d.Desired != nil { + schema := envBody.AppendNewBlock("schema", nil).Body() + schema.SetAttributeValue("src", cty.StringVal(d.Desired.String())) + } if d.URL != nil { envBody.SetAttributeValue("url", cty.StringVal(d.URL.String())) } diff --git a/internal/controller/atlasschema_controller_test.go b/internal/controller/atlasschema_controller_test.go index 54ca3a7..d06e6c2 100644 --- a/internal/controller/atlasschema_controller_test.go +++ b/internal/controller/atlasschema_controller_test.go @@ -77,7 +77,7 @@ func TestReconcile_ReadyButDiff(t *testing.T) { Spec: dbv1alpha1.AtlasSchemaSpec{ Schema: dbv1alpha1.Schema{SQL: "create table foo (id int primary key);"}, TargetSpec: dbv1alpha1.TargetSpec{ - URL: "mysql://root:password@localhost:3306/test", + URL: "sqlite://file?mode=memory", }, }, Status: dbv1alpha1.AtlasSchemaStatus{ @@ -442,7 +442,7 @@ func TestBadSQL(t *testing.T) { cont := tt.cond() require.EqualValues(t, schemaReadyCond, cont.Type) require.EqualValues(t, metav1.ConditionFalse, cont.Status) - require.EqualValues(t, "LintPolicyError", cont.Reason) + require.EqualValues(t, "CalculatingHash", cont.Reason) require.Contains(t, cont.Message, "executing statement:") } @@ -500,6 +500,9 @@ func TestConfigTemplate(t *testing.T) { err := data.render(&buf) require.NoError(t, err) expected := `env "kubernetes" { + schema { + src = "file://schema.sql" + } url = "mysql://root:password@localhost:3306/test" dev = "mysql://root:password@localhost:3306/dev" schemas = ["foo", "bar"] @@ -573,6 +576,9 @@ env "kubernetes" { dev = "mysql://root:password@localhost:3306/dev" schemas = ["foo", "bar"] url = "mysql://root:password@localhost:3306/test" + schema { + src = "file://schema.sql" + } } ` require.EqualValues(t, expected, buf.String()) @@ -599,6 +605,9 @@ env { dev = "mysql://root:password@localhost:3306/dev" schemas = ["foo", "bar"] url = "mysql://root:password@localhost:3306/test" + schema { + src = "file://schema.sql" + } }` require.EqualValues(t, expected, buf.String()) } diff --git a/internal/controller/common.go b/internal/controller/common.go index 4c1ba89..d8ffad7 100644 --- a/internal/controller/common.go +++ b/internal/controller/common.go @@ -152,6 +152,15 @@ func isChecksumErr(err error) bool { return strings.Contains(err.Error(), "checksum mismatch") } +// isConnectionErr returns true if the error is a connection error. +func isConnectionErr(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "connection timed out") || + strings.Contains(err.Error(), "connection refused") +} + // transientError is an error that should be retried. type transientError struct { err error diff --git a/test/e2e/testscript/schema-hash-configmap.txtar b/test/e2e/testscript/schema-hash-configmap.txtar new file mode 100644 index 0000000..1a185fb --- /dev/null +++ b/test/e2e/testscript/schema-hash-configmap.txtar @@ -0,0 +1,129 @@ +env DB_URL=mysql://root:pass@mysql.${NAMESPACE}:3306/myapp +kubectl apply -f database.yaml +kubectl create secret generic db-creds --from-literal=url=${DB_URL} + +# Wait for the first pod created +kubectl-wait-available deploy/mysql +# Wait for the DB ready before creating the schema +kubectl-wait-ready -l app=mysql pods + +# Create the configmap to store the schema.sql +kubectl create configmap mysql-schema --from-file=./schema-v1 --dry-run=client -o yaml +stdin stdout +kubectl apply -f - + +# Create the schema +kubectl apply -f schema.yaml +kubectl-wait-ready AtlasSchema/mysql + +kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql +stdout oAoRLC2AXyGha6pKDollSqBB5ovjB\+qK78aAN9dkOow\= + +# Update the configmap with the new schema +kubectl create configmap mysql-schema --from-file=./schema-v2 --dry-run=client -o yaml +stdin stdout +kubectl apply -f - + +# Ensure the controller is aware of the change +kubectl wait --for=condition=ready=false --timeout=500s AtlasSchema/mysql +kubectl-wait-ready AtlasSchema/mysql + +# Hash should be updated +kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql +stdout mlBKa2H4Mt7J8uStzuFj6Ps0XRIB9z3EuZPfOw6BeIA\= + +-- schema-v1/schema.sql -- +create table users ( + id int not null auto_increment, + name varchar(255) not null, + email varchar(255) unique not null, + short_bio varchar(255) not null, + primary key (id) +); +-- schema-v2/schema.sql -- +create table users ( + id int not null auto_increment, + name varchar(255) not null, + email varchar(255) unique not null, + short_bio varchar(255) not null, + phone varchar(255) not null, + primary key (id) +); +-- schema.yaml -- +apiVersion: db.atlasgo.io/v1alpha1 +kind: AtlasSchema +metadata: + name: mysql +spec: + urlFrom: + secretKeyRef: + name: db-creds + key: url + schema: + configMapKeyRef: + key: schema.sql + name: mysql-schema +-- database.yaml -- +apiVersion: v1 +kind: Service +metadata: + name: mysql +spec: + selector: + app: mysql + ports: + - name: mysql + port: 3306 + targetPort: mysql + - name: mysql-dev + port: 3307 + targetPort: mysql-dev + type: ClusterIP +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mysql +spec: + selector: + matchLabels: + app: mysql + replicas: 1 + template: + metadata: + labels: + app: mysql + spec: + containers: + - name: mysql + image: mysql:latest + env: + - name: MYSQL_ROOT_PASSWORD + value: pass + - name: MYSQL_DATABASE + value: myapp + ports: + - containerPort: 3306 + name: mysql + startupProbe: + exec: + command: [ "mysql", "-ppass", "-h", "127.0.0.1", "-e", "SELECT 1" ] + failureThreshold: 30 + periodSeconds: 10 + - name: mysql-dev + image: mysql:latest + env: + - name: MYSQL_ROOT_PASSWORD + value: pass + - name: MYSQL_DATABASE + value: myapp + - name: MYSQL_TCP_PORT + value: "3307" + ports: + - containerPort: 3307 + name: mysql-dev + startupProbe: + exec: + command: [ "mysql", "-ppass", "-h", "127.0.0.1", "-e", "SELECT 1" ] + failureThreshold: 30 + periodSeconds: 10 diff --git a/test/e2e/testscript/schema-hash.txtar b/test/e2e/testscript/schema-hash.txtar new file mode 100644 index 0000000..22d30cb --- /dev/null +++ b/test/e2e/testscript/schema-hash.txtar @@ -0,0 +1,139 @@ +env DB_URL=mysql://root:pass@mysql.${NAMESPACE}:3306/myapp +kubectl apply -f database.yaml +kubectl create secret generic db-creds --from-literal=url=${DB_URL} + +# Wait for the first pod created +kubectl-wait-available deploy/mysql +# Wait for the DB ready before creating the schema +kubectl-wait-ready -l app=mysql pods + +# Create the schema +kubectl apply -f schema.yaml +kubectl-wait-ready AtlasSchema/mysql + +kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql +stdout oAoRLC2AXyGha6pKDollSqBB5ovjB\+qK78aAN9dkOow\= + +# Update the configmap with the new schema +kubectl patch -f schema.yaml --type merge --patch-file schema-v2-patch.yaml + +# Ensure the controller is aware of the change +kubectl wait --for=condition=ready=false --timeout=500s AtlasSchema/mysql +kubectl-wait-ready AtlasSchema/mysql + +# Hash should be updated +kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql +stdout mlBKa2H4Mt7J8uStzuFj6Ps0XRIB9z3EuZPfOw6BeIA\= + +-- schema-v1/schema.sql -- +create table users ( + id int not null auto_increment, + name varchar(255) not null, + email varchar(255) unique not null, + short_bio varchar(255) not null, + primary key (id) +); +-- schema-v2/schema.sql -- +create table users ( + id int not null auto_increment, + name varchar(255) not null, + email varchar(255) unique not null, + short_bio varchar(255) not null, + phone varchar(255) not null, + primary key (id) +); +-- schema.yaml -- +apiVersion: db.atlasgo.io/v1alpha1 +kind: AtlasSchema +metadata: + name: mysql +spec: + urlFrom: + secretKeyRef: + name: db-creds + key: url + schema: + sql: | + create table users ( + id int not null auto_increment, + name varchar(255) not null, + email varchar(255) unique not null, + short_bio varchar(255) not null, + primary key (id) + ); +-- schema-v2-patch.yaml -- +spec: + schema: + sql: | + create table users ( + id int not null auto_increment, + name varchar(255) not null, + email varchar(255) unique not null, + short_bio varchar(255) not null, + phone varchar(255) not null, + primary key (id) + ); +-- database.yaml -- +apiVersion: v1 +kind: Service +metadata: + name: mysql +spec: + selector: + app: mysql + ports: + - name: mysql + port: 3306 + targetPort: mysql + - name: mysql-dev + port: 3307 + targetPort: mysql-dev + type: ClusterIP +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mysql +spec: + selector: + matchLabels: + app: mysql + replicas: 1 + template: + metadata: + labels: + app: mysql + spec: + containers: + - name: mysql + image: mysql:latest + env: + - name: MYSQL_ROOT_PASSWORD + value: pass + - name: MYSQL_DATABASE + value: myapp + ports: + - containerPort: 3306 + name: mysql + startupProbe: + exec: + command: [ "mysql", "-ppass", "-h", "127.0.0.1", "-e", "SELECT 1" ] + failureThreshold: 30 + periodSeconds: 10 + - name: mysql-dev + image: mysql:latest + env: + - name: MYSQL_ROOT_PASSWORD + value: pass + - name: MYSQL_DATABASE + value: myapp + - name: MYSQL_TCP_PORT + value: "3307" + ports: + - containerPort: 3307 + name: mysql-dev + startupProbe: + exec: + command: [ "mysql", "-ppass", "-h", "127.0.0.1", "-e", "SELECT 1" ] + failureThreshold: 30 + periodSeconds: 10 diff --git a/test/e2e/testscript/schema-policy-diff.txtar b/test/e2e/testscript/schema-policy-diff.txtar index 52b8217..af83aa0 100644 --- a/test/e2e/testscript/schema-policy-diff.txtar +++ b/test/e2e/testscript/schema-policy-diff.txtar @@ -11,16 +11,17 @@ kubectl-wait-ready -l app=mysql pods kubectl apply -f schema.yaml kubectl-wait-ready AtlasSchema/mysql kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql -stdout ddfe666707ddf5c2cc7625c2a0de89da51e54fc7caa6403db307146430d20d85 +stdout oAoRLC2AXyGha6pKDollSqBB5ovjB\+qK78aAN9dkOow\= # Inspect the schema to ensure it's correct atlas schema inspect -u ${DB_URL} cmp stdout schema.hcl kubectl patch -f schema.yaml --type merge --patch-file patch-remove-bio.yaml +exec sleep 10 kubectl-wait-ready AtlasSchema/mysql # Ensure the schema is updated kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql -stdout fc62b9f189404d5e38053e5aae9e64bd71c7f854d7001d45c1583d65ac90223c +stdout UmrKZN7GNsjWxLOq6VJ3vejqnvBQU9BeoDZlL\/2LTKU\= # Inspect the schema again to ensure it still has the bio column atlas schema inspect -u ${DB_URL} cmp stdout schema.hcl diff --git a/test/e2e/testscript/schema-review-always.txtar b/test/e2e/testscript/schema-review-always.txtar index feda045..581e211 100644 --- a/test/e2e/testscript/schema-review-always.txtar +++ b/test/e2e/testscript/schema-review-always.txtar @@ -33,7 +33,7 @@ kubectl-wait-ready AtlasSchema/postgres atlas schema inspect -u ${DB_URL} cmp stdout schema-v1.hcl kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/postgres -stdout 2a59599f3d4f2f07d4fabb87bf872030020fd7410e10a1042ace7a0865953cac +stdout lhl512tzvwQXJ9lroGuRNzGS6fiic8r9ohGMV\+\/Ij0w\= # Apply again without any change in the schema, it should be a no-op kubectl patch -f schema.yaml --type merge --patch-file patch-comment.yaml @@ -44,7 +44,7 @@ kubectl-wait-ready AtlasSchema/postgres atlas schema inspect -u ${DB_URL} cmp stdout schema-v1.hcl kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/postgres -stdout 6a60e051a482fb40392f768e0a080bc101bd9d87efa21ff40472b1dbd7d4e7a8 +stdout lhl512tzvwQXJ9lroGuRNzGS6fiic8r9ohGMV\+\/Ij0w\= -- empty.hcl -- schema "public" { comment = "standard public schema"