Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/controller: generate hash of declarative flow by using "schema inspect" #253

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 53 additions & 44 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package controller
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -140,23 +138,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
res.SetNotReady("ReadSchema", "Multiple targets are not supported")
return ctrl.Result{}, nil
}
hash, err := data.hash()
if err != nil {
res.SetNotReady("CalculatingHash", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
// This is to ensure that other tools know we are still applying the changes.
if res.IsReady() && res.IsHashModified(hash) {
res.SetNotReady("Reconciling", "current schema does not match last applied")
return ctrl.Result{Requeue: true}, nil
opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)}
if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile {
// Write the schema file to the working directory.
opts = append(opts, func(ce *atlasexec.WorkingDir) error {
_, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema)
return err
})
}
// ====================================================
// Starting area to handle the heavy jobs.
// Below this line is the main logic of the controller.
// ====================================================
if !data.hasDevURL() && data.URL != nil {
// The user has not specified an URL for dev-db,
// spin up a dev-db and get the connection string.
Expand All @@ -166,17 +155,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return result(err)
}
}
opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)}
if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile {
// Write the schema file to the working directory.
opts = append(opts, func(ce *atlasexec.WorkingDir) error {
_, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema)
return err
})
}
// Create a working directory for the Atlas CLI
// The working directory contains the atlas.hcl config.
wd, err := atlasexec.NewWorkingDir(opts...)
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
defer wd.Close()
// This function will be used to edit and re-render the atlas.hcl file in the working directory.
editAtlasHCL := func(fn func(m *managedData)) error {
fn(data)
Expand All @@ -187,18 +174,38 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
_, err = wd.WriteFile("atlas.hcl", buf.Bytes())
return err
}
cli, err := r.atlasClient(wd.Path(), data.Cloud)
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
res.SetNotReady("CreatingAtlasClient", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
defer wd.Close()
cli, err := r.atlasClient(wd.Path(), data.Cloud)
// Calculate the hash of the current schema.
hash, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: data.EnvName,
URL: data.targetURL(),
Format: `{{ .Hash }}`,
Vars: data.Vars,
})
if err != nil {
res.SetNotReady("CreatingAtlasClient", err.Error())
res.SetNotReady("CalculatingHash", err.Error())
r.recordErrEvent(res, err)
if isConnectionErr(err) {
err = transient(err)
}
return result(err)
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
// This is to ensure that other tools know we are still applying the changes.
if res.IsReady() && res.IsHashModified(hash) {
res.SetNotReady("Reconciling", "current schema does not match last applied")
return ctrl.Result{Requeue: true}, nil
}
// ====================================================
// Starting area to handle the heavy jobs.
// Below this line is the main logic of the controller.
// ====================================================
var whoami *atlasexec.WhoAmI
switch whoami, err = cli.WhoAmI(ctx); {
case errors.Is(err, atlasexec.ErrRequireLogin):
Expand Down Expand Up @@ -251,9 +258,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// And the schema is available for the Atlas CLI (on local machine)
// to modify or approve the changes.
if data.Desired.Scheme == dbv1alpha1.SchemaTypeFile {
log.Info("schema is a file, pushing the schema to Atlas Cloud")
// Using hash of desired state as the tag for the schema.
// This ensures push is idempotent.
tag, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: data.EnvName,
URL: desiredURL,
Expand All @@ -270,6 +274,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recordErrEvent(res, err)
return result(err)
}
log.Info("schema is a file, pushing the schema to Atlas Cloud")
state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{
Env: data.EnvName,
Name: path.Join(repo.Host, repo.Path),
Expand Down Expand Up @@ -744,17 +749,6 @@ func (d *managedData) hasLintReview() bool {
return lint.Body().GetAttribute("review") != nil
}

// hash returns the sha256 hash of the desired.
func (d *managedData) hash() (string, error) {
h := sha256.New()
if len(d.schema) > 0 {
h.Write([]byte(d.schema))
} else {
h.Write([]byte(d.Desired.String()))
}
return hex.EncodeToString(h.Sum(nil)), nil
}

func (d *managedData) repoURL() *url.URL {
switch {
// The user has provided the repository name.
Expand All @@ -773,6 +767,17 @@ func (d *managedData) repoURL() *url.URL {
}
}

// targetURL returns the target URL for the environment.
func (d *managedData) targetURL() string {
if d.Config != nil {
env := searchBlock(d.Config.Body(), hclwrite.NewBlock("env", []string{d.EnvName}))
if env != nil && env.Body().GetAttribute("src") != nil {
return "env://src"
}
}
return "env://schema.src"
}

// render renders the atlas.hcl template.
//
// The template is used by the Atlas CLI to apply the schema.
Expand Down Expand Up @@ -857,6 +862,10 @@ func (d *managedData) asBlocks() []*hclwrite.Block {
env := hclwrite.NewBlock("env", []string{d.EnvName})
blocks = append(blocks, env)
envBody := env.Body()
if d.Desired != nil {
schema := envBody.AppendNewBlock("schema", nil).Body()
schema.SetAttributeValue("src", cty.StringVal(d.Desired.String()))
Comment on lines +866 to +867
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also set this schema.src to file://schema.hcl or file://schema.sql

}
if d.URL != nil {
envBody.SetAttributeValue("url", cty.StringVal(d.URL.String()))
}
Expand Down
13 changes: 11 additions & 2 deletions internal/controller/atlasschema_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestReconcile_ReadyButDiff(t *testing.T) {
Spec: dbv1alpha1.AtlasSchemaSpec{
Schema: dbv1alpha1.Schema{SQL: "create table foo (id int primary key);"},
TargetSpec: dbv1alpha1.TargetSpec{
URL: "mysql://root:password@localhost:3306/test",
URL: "sqlite://file?mode=memory",
},
},
Status: dbv1alpha1.AtlasSchemaStatus{
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestBadSQL(t *testing.T) {
cont := tt.cond()
require.EqualValues(t, schemaReadyCond, cont.Type)
require.EqualValues(t, metav1.ConditionFalse, cont.Status)
require.EqualValues(t, "LintPolicyError", cont.Reason)
require.EqualValues(t, "CalculatingHash", cont.Reason)
require.Contains(t, cont.Message, "executing statement:")
}

Expand Down Expand Up @@ -500,6 +500,9 @@ func TestConfigTemplate(t *testing.T) {
err := data.render(&buf)
require.NoError(t, err)
expected := `env "kubernetes" {
schema {
src = "file://schema.sql"
}
url = "mysql://root:password@localhost:3306/test"
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo", "bar"]
Expand Down Expand Up @@ -573,6 +576,9 @@ env "kubernetes" {
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo", "bar"]
url = "mysql://root:password@localhost:3306/test"
schema {
src = "file://schema.sql"
}
}
`
require.EqualValues(t, expected, buf.String())
Expand All @@ -599,6 +605,9 @@ env {
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo", "bar"]
url = "mysql://root:password@localhost:3306/test"
schema {
src = "file://schema.sql"
}
}`
require.EqualValues(t, expected, buf.String())
}
Expand Down
9 changes: 9 additions & 0 deletions internal/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ func isChecksumErr(err error) bool {
return strings.Contains(err.Error(), "checksum mismatch")
}

// isConnectionErr returns true if the error is a connection error.
func isConnectionErr(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "connection timed out") ||
strings.Contains(err.Error(), "connection refused")
}

// transientError is an error that should be retried.
type transientError struct {
err error
Expand Down
129 changes: 129 additions & 0 deletions test/e2e/testscript/schema-hash-configmap.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
env DB_URL=mysql://root:pass@mysql.${NAMESPACE}:3306/myapp
kubectl apply -f database.yaml
kubectl create secret generic db-creds --from-literal=url=${DB_URL}

# Wait for the first pod created
kubectl-wait-available deploy/mysql
# Wait for the DB ready before creating the schema
kubectl-wait-ready -l app=mysql pods

# Create the configmap to store the schema.sql
kubectl create configmap mysql-schema --from-file=./schema-v1 --dry-run=client -o yaml
stdin stdout
kubectl apply -f -

# Create the schema
kubectl apply -f schema.yaml
kubectl-wait-ready AtlasSchema/mysql

kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql
stdout oAoRLC2AXyGha6pKDollSqBB5ovjB\+qK78aAN9dkOow\=

# Update the configmap with the new schema
kubectl create configmap mysql-schema --from-file=./schema-v2 --dry-run=client -o yaml
stdin stdout
kubectl apply -f -

# Ensure the controller is aware of the change
kubectl wait --for=condition=ready=false --timeout=500s AtlasSchema/mysql
kubectl-wait-ready AtlasSchema/mysql

# Hash should be updated
kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql
stdout mlBKa2H4Mt7J8uStzuFj6Ps0XRIB9z3EuZPfOw6BeIA\=

-- schema-v1/schema.sql --
create table users (
id int not null auto_increment,
name varchar(255) not null,
email varchar(255) unique not null,
short_bio varchar(255) not null,
primary key (id)
);
-- schema-v2/schema.sql --
create table users (
id int not null auto_increment,
name varchar(255) not null,
email varchar(255) unique not null,
short_bio varchar(255) not null,
phone varchar(255) not null,
primary key (id)
);
-- schema.yaml --
apiVersion: db.atlasgo.io/v1alpha1
kind: AtlasSchema
metadata:
name: mysql
spec:
urlFrom:
secretKeyRef:
name: db-creds
key: url
schema:
configMapKeyRef:
key: schema.sql
name: mysql-schema
-- database.yaml --
apiVersion: v1
kind: Service
metadata:
name: mysql
spec:
selector:
app: mysql
ports:
- name: mysql
port: 3306
targetPort: mysql
- name: mysql-dev
port: 3307
targetPort: mysql-dev
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
spec:
selector:
matchLabels:
app: mysql
replicas: 1
template:
metadata:
labels:
app: mysql
spec:
containers:
- name: mysql
image: mysql:latest
env:
- name: MYSQL_ROOT_PASSWORD
value: pass
- name: MYSQL_DATABASE
value: myapp
ports:
- containerPort: 3306
name: mysql
startupProbe:
exec:
command: [ "mysql", "-ppass", "-h", "127.0.0.1", "-e", "SELECT 1" ]
failureThreshold: 30
periodSeconds: 10
- name: mysql-dev
image: mysql:latest
env:
- name: MYSQL_ROOT_PASSWORD
value: pass
- name: MYSQL_DATABASE
value: myapp
- name: MYSQL_TCP_PORT
value: "3307"
ports:
- containerPort: 3307
name: mysql-dev
startupProbe:
exec:
command: [ "mysql", "-ppass", "-h", "127.0.0.1", "-e", "SELECT 1" ]
failureThreshold: 30
periodSeconds: 10
Loading
Loading