Skip to content

Commit

Permalink
feat(ledger): add stateless version starting from v2.2 (#165)
Browse files Browse the repository at this point in the history
* feat(ledger): add stateless version starting from v2.2

* chore: remove legacy tests
  • Loading branch information
gfyrag authored Oct 22, 2024
1 parent f200e37 commit 9e11474
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 128 deletions.
4 changes: 2 additions & 2 deletions internal/resources/ledgers/assets/Caddyfile.gotpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

handle {
method GET
reverse_proxy ledger-read:8080{
reverse_proxy ledger-read:8080 {
header_up Host {upstream_hostport}
}
}

handle {
reverse_proxy ledger-write:8080{
reverse_proxy ledger-write:8080 {
header_up Host {upstream_hostport}
}
}
Expand Down
75 changes: 72 additions & 3 deletions internal/resources/ledgers/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ledgers

import (
"fmt"
"golang.org/x/mod/semver"
"strconv"

"github.com/formancehq/operator/internal/resources/brokers"
Expand Down Expand Up @@ -73,8 +74,14 @@ func hasDeploymentStrategyChanged(ctx core.Context, stack *v1beta1.Stack, ledger
}
}

func installLedger(ctx core.Context, stack *v1beta1.Stack,
ledger *v1beta1.Ledger, database *v1beta1.Database, image string, isV2 bool) (err error) {
func installLedger(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, database *v1beta1.Database, image string, version string, isV2 bool) (err error) {

if !semver.IsValid(version) || semver.Compare(version, "v2.2.0-alpha") > 0 {
if err := uninstallLedgerMonoWriterMultipleReader(ctx, stack); err != nil {
return err
}
return installLedgerStateless(ctx, stack, ledger, database, image, isV2)
}

deploymentStrategySettings, err := settings.GetStringOrDefault(ctx, stack.Name, v1beta1.DeploymentStrategySingle, "ledger", "deployment-strategy")
if err != nil {
Expand All @@ -86,7 +93,7 @@ func installLedger(ctx core.Context, stack *v1beta1.Stack,
}

if err = hasDeploymentStrategyChanged(ctx, stack, ledger, deploymentStrategySettings); err != nil {
return
return err
}

switch deploymentStrategySettings {
Expand Down Expand Up @@ -135,6 +142,68 @@ func installLedgerSingleInstance(ctx core.Context, stack *v1beta1.Stack,
return nil
}

func installLedgerStateless(ctx core.Context, stack *v1beta1.Stack,
ledger *v1beta1.Ledger, database *v1beta1.Database, version string, v2 bool) error {
container := corev1.Container{
Name: "ledger",
}
container.Env = append(container.Env, core.Env("BIND", ":8080"))

var broker *v1beta1.Broker
if t, err := brokertopics.Find(ctx, stack, "ledger"); err != nil {
return err
} else if t != nil && t.Status.Ready {
broker = &v1beta1.Broker{}
if err := ctx.GetClient().Get(ctx, types.NamespacedName{
Name: stack.Name,
}, broker); err != nil {
return err
}
}

if broker != nil {
if !broker.Status.Ready {
return core.NewPendingError().WithMessage("broker not ready")
}

brokerEnvVar, err := brokers.GetBrokerEnvVars(ctx, broker.Status.URI, stack.Name, "ledger")
if err != nil {
return err
}

container.Env = append(container.Env, brokerEnvVar...)
container.Env = append(container.Env, brokers.GetPublisherEnvVars(stack, broker, "ledger", "")...)
}

err := setCommonContainerConfiguration(ctx, stack, ledger, version, database, &container, v2)
if err != nil {
return err
}

serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name)
if err != nil {
return err
}

tpl := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "ledger",
},
Spec: appsv1.DeploymentSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{container},
ServiceAccountName: serviceAccountName,
},
},
},
}

return applications.
New(ledger, tpl).
Install(ctx)
}

func getUpgradeContainer(ctx core.Context, stack *v1beta1.Stack, database *v1beta1.Database, image, version string) (corev1.Container, error) {
return databases.MigrateDatabaseContainer(ctx, stack, image, database,
func(m *databases.MigrationConfiguration) {
Expand Down
7 changes: 1 addition & 6 deletions internal/resources/ledgers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,7 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio
}
}

err = installLedger(ctx, stack, ledger, database, image, isV2)
if err != nil {
return err
}

return nil
return installLedger(ctx, stack, ledger, database, image, version, isV2)
}

func init() {
Expand Down
118 changes: 1 addition & 117 deletions internal/tests/ledger_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package tests_test

import (
"github.com/formancehq/go-libs/collectionutils"
v1beta1 "github.com/formancehq/operator/api/formance.com/v1beta1"
"github.com/formancehq/operator/api/formance.com/v1beta1"
"github.com/formancehq/operator/internal/core"
"github.com/formancehq/operator/internal/resources/ledgers"
"github.com/formancehq/operator/internal/resources/settings"
. "github.com/formancehq/operator/internal/tests/internal"
"github.com/google/uuid"
Expand Down Expand Up @@ -171,121 +170,6 @@ var _ = Describe("LedgerController", func() {
})
})
})
Context("with multi ready deployment strategy", func() {
JustBeforeEach(func() {
Eventually(func() error {
return LoadResource(stack.Name, "ledger", &appsv1.Deployment{})
}).Should(Succeed())
patch := client.MergeFrom(ledger.DeepCopy())
ledger.Spec.DeploymentStrategy = v1beta1.DeploymentStrategyMonoWriterMultipleReader
Expect(Patch(ledger, patch)).To(Succeed())
})
It("Should update resources", func() {
By("Should remove the original deployment", func() {
Eventually(func() error {
return LoadResource(stack.Name, "ledger", &appsv1.Deployment{})
}).Should(BeNotFound())
})
By("Should create two applications, two services and a gateway", func() {
reader := &appsv1.Deployment{}
Eventually(func() error {
return LoadResource(stack.Name, "ledger-read", reader)
}).Should(Succeed())
Expect(reader).To(BeControlledBy(ledger))

readerService := &corev1.Service{}
Eventually(func() error {
return LoadResource(stack.Name, "ledger-read", readerService)
}).Should(Succeed())
Expect(readerService).To(BeControlledBy(ledger))
Expect(readerService).To(TargetDeployment(reader))

writer := &appsv1.Deployment{}
Eventually(func() error {
return LoadResource(stack.Name, "ledger-write", writer)
}).Should(Succeed())
Expect(writer).To(BeControlledBy(ledger))

writerService := &corev1.Service{}
Eventually(func() error {
return LoadResource(stack.Name, "ledger-write", writerService)
}).Should(Succeed())
Expect(writerService).To(BeControlledBy(ledger))
Expect(writerService).To(TargetDeployment(writer))

gateway := &appsv1.Deployment{}
Eventually(func() error {
return LoadResource(stack.Name, "ledger-gateway", gateway)
}).Should(Succeed())
Expect(gateway).To(BeControlledBy(ledger))
Expect(gateway.Spec.Template.Spec.SecurityContext).NotTo(BeNil())
Expect(gateway.Spec.Template.Spec.Containers).To(HaveLen(1))
Expect(gateway.Spec.Template.Spec.Containers[0].SecurityContext).NotTo(BeNil())
Expect(gateway.Spec.Template.Spec.Containers[0].SecurityContext.Capabilities).NotTo(BeNil())
Expect(gateway.Spec.Template.Spec.Containers[0].SecurityContext.Capabilities.Add).To(HaveLen(1))
Expect(gateway.Spec.Template.Spec.Containers[0].SecurityContext.Capabilities.Add).To(ContainElements(corev1.Capability("NET_BIND_SERVICE")))
})
})
It("Should update the strategy condition", func() {
Eventually(func(g Gomega) string {
g.Expect(LoadResource("", ledger.Name, ledger)).To(Succeed())
g.Expect(ledger.Status.Conditions.Get(ledgers.ConditionTypeDeploymentStrategy)).ToNot(BeNil())
return ledger.Status.Conditions.Get(ledgers.ConditionTypeDeploymentStrategy).Reason
}).Should(Equal(ledgers.ReasonLedgerMonoWriterMultipleReader))
})
It("Should delete the deployment", func() {
deployment := &appsv1.Deployment{}
Eventually(func(g Gomega) bool {
g.Expect(Get(core.GetNamespacedResourceName(stack.Name, "ledger"), deployment)).ToNot(Succeed())
return true
}).Should(BeTrue())
})
When("Updating the deployment strategy to single", func() {
var cp *v1beta1.Ledger
JustBeforeEach(func() {
cp = ledger.DeepCopy()
patch := client.MergeFrom(ledger.DeepCopy())
ledger.Spec.DeploymentStrategy = v1beta1.DeploymentStrategySingle
Expect(Patch(ledger, patch)).To(Succeed())
})
It("Should only have one ConditionDeploymentStrategy per generation", func() {
Eventually(func(g Gomega) bool {
g.Expect(LoadResource("", ledger.Name, ledger)).To(Succeed())
g.Expect(ledger.Generation).ToNot(Equal(cp.Generation))
data := collectionutils.Reduce(
collectionutils.Filter(ledger.Status.Conditions, func(condition v1beta1.Condition) bool {
return condition.Type == ledgers.ConditionTypeDeploymentStrategy
}),
func(acc map[int64]v1beta1.Conditions, condition v1beta1.Condition) map[int64]v1beta1.Conditions {
if _, ok := acc[condition.ObservedGeneration]; !ok {
acc[condition.ObservedGeneration] = append(v1beta1.Conditions{}, condition)
}
return acc
}, map[int64]v1beta1.Conditions{},
)

for _, condition := range data {
g.
Expect(condition).
To(HaveLen(1))
}
return true

}).Should(BeTrue())
})
It("Should have the latest generation ConditionDeploymentStrategy", func() {
Eventually(func(g Gomega) bool {
g.Expect(LoadResource("", ledger.Name, ledger)).To(Succeed())
g.Expect(ledger.Generation).ToNot(Equal(cp.Generation))
return ledger.GetConditions().Check(v1beta1.AndConditions(
v1beta1.ConditionTypeMatch(ledgers.ConditionTypeDeploymentStrategy),
v1beta1.ConditionGenerationMatch(ledger.Generation),
))

}).Should(BeTrue())
})
})
})
Context("With Search enabled", func() {
var search *v1beta1.Search
BeforeEach(func() {
Expand Down

0 comments on commit 9e11474

Please sign in to comment.