From 0d48ef7e0f204f0a1d53cbed4caae346271a09d0 Mon Sep 17 00:00:00 2001 From: Justin Thompson <37157877+J-Thompson12@users.noreply.github.com> Date: Tue, 27 Jul 2021 13:12:59 -0600 Subject: [PATCH] App pause (#501) * pause/unpause * get deployment type * add suspend check * add examples * combine pause/unpause * add suspend type * update fake flux * spelling error * added wrong file * rebase * address comments Co-authored-by: Justin Thompson --- api/v1alpha1/application_types.go | 8 +++ cmd/wego/app/cmd.go | 14 ++++- cmd/wego/app/pause/cmd.go | 51 +++++++++++++++++ cmd/wego/app/unpause/cmd.go | 51 +++++++++++++++++ pkg/flux/flux.go | 10 ++++ pkg/flux/fluxfakes/fake_flux.go | 86 +++++++++++++++++++++++++++++ pkg/services/app/app.go | 92 +++++++++++++++++++++++++++++++ pkg/services/app/pause.go | 12 ++++ pkg/services/app/status.go | 11 +--- pkg/services/app/unpause.go | 12 ++++ 10 files changed, 335 insertions(+), 12 deletions(-) create mode 100644 cmd/wego/app/pause/cmd.go create mode 100644 cmd/wego/app/unpause/cmd.go create mode 100644 pkg/services/app/pause.go create mode 100644 pkg/services/app/unpause.go diff --git a/api/v1alpha1/application_types.go b/api/v1alpha1/application_types.go index cb34d9f20e..682d8f9528 100644 --- a/api/v1alpha1/application_types.go +++ b/api/v1alpha1/application_types.go @@ -56,6 +56,14 @@ const ( SourceTypeHelm SourceType = "helm" ) +// SuspendAction defines the command run to pause/unpause an application +type SuspendActionType string + +const ( + SuspendAction SuspendActionType = "suspend" + ResumeAction SuspendActionType = "resume" +) + // ApplicationStatus defines the observed state of Application type ApplicationStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster diff --git a/cmd/wego/app/cmd.go b/cmd/wego/app/cmd.go index 072a930ce6..e2757ad20e 100644 --- a/cmd/wego/app/cmd.go +++ b/cmd/wego/app/cmd.go @@ -4,13 +4,15 @@ import ( "github.com/spf13/cobra" "github.com/weaveworks/weave-gitops/cmd/wego/app/add" "github.com/weaveworks/weave-gitops/cmd/wego/app/list" + "github.com/weaveworks/weave-gitops/cmd/wego/app/pause" "github.com/weaveworks/weave-gitops/cmd/wego/app/status" + "github.com/weaveworks/weave-gitops/cmd/wego/app/unpause" ) var ApplicationCmd = &cobra.Command{ Use: "app", Short: "Manages your applications", - Example:` + Example: ` # Add an application to wego from local git repository wego app add . --name @@ -18,7 +20,13 @@ var ApplicationCmd = &cobra.Command{ wego app status # List applications under wego control - wego app list`, + wego app list + + # Pause gitops automation + wego app pause + + # Unpause gitops automation + wego app unpause `, Args: cobra.MinimumNArgs(1), } @@ -26,4 +34,6 @@ func init() { ApplicationCmd.AddCommand(status.Cmd) ApplicationCmd.AddCommand(add.Cmd) ApplicationCmd.AddCommand(list.Cmd) + ApplicationCmd.AddCommand(pause.Cmd) + ApplicationCmd.AddCommand(unpause.Cmd) } diff --git a/cmd/wego/app/pause/cmd.go b/cmd/wego/app/pause/cmd.go new file mode 100644 index 0000000000..c1f9689c77 --- /dev/null +++ b/cmd/wego/app/pause/cmd.go @@ -0,0 +1,51 @@ +package pause + +import ( + "fmt" + "os" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/weaveworks/weave-gitops/cmd/wego/version" + "github.com/weaveworks/weave-gitops/pkg/flux" + "github.com/weaveworks/weave-gitops/pkg/kube" + "github.com/weaveworks/weave-gitops/pkg/logger" + "github.com/weaveworks/weave-gitops/pkg/runner" + "github.com/weaveworks/weave-gitops/pkg/services/app" +) + +var params app.PauseParams + +var Cmd = &cobra.Command{ + Use: "pause ", + Short: "Pause an application", + Args: cobra.MinimumNArgs(1), + Example: "wego app pause podinfo", + RunE: runCmd, + SilenceUsage: true, + SilenceErrors: true, + PostRun: func(cmd *cobra.Command, args []string) { + version.CheckVersion(version.CheckpointParamsWithFlags(version.CheckpointParams(), cmd)) + }, +} + +func runCmd(cmd *cobra.Command, args []string) error { + params.Namespace, _ = cmd.Parent().Flags().GetString("namespace") + params.Name = args[0] + + cliRunner := &runner.CLIRunner{} + fluxClient := flux.New(cliRunner) + logger := logger.New(os.Stdout) + kubeClient, err := kube.NewKubeHTTPClient() + if err != nil { + return fmt.Errorf("error initializing kube client: %w", err) + } + + appService := app.New(logger, nil, fluxClient, kubeClient) + + if err := appService.Pause(params); err != nil { + return errors.Wrapf(err, "failed to pause the app %s", params.Name) + } + + return nil +} diff --git a/cmd/wego/app/unpause/cmd.go b/cmd/wego/app/unpause/cmd.go new file mode 100644 index 0000000000..cdc2b9f88c --- /dev/null +++ b/cmd/wego/app/unpause/cmd.go @@ -0,0 +1,51 @@ +package unpause + +import ( + "fmt" + "os" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/weaveworks/weave-gitops/cmd/wego/version" + "github.com/weaveworks/weave-gitops/pkg/flux" + "github.com/weaveworks/weave-gitops/pkg/kube" + "github.com/weaveworks/weave-gitops/pkg/logger" + "github.com/weaveworks/weave-gitops/pkg/runner" + "github.com/weaveworks/weave-gitops/pkg/services/app" +) + +var params app.UnpauseParams + +var Cmd = &cobra.Command{ + Use: "unpause ", + Short: "Unpause an application", + Args: cobra.MinimumNArgs(1), + Example: "wego app unpause podinfo", + RunE: runCmd, + SilenceUsage: true, + SilenceErrors: true, + PostRun: func(cmd *cobra.Command, args []string) { + version.CheckVersion(version.CheckpointParamsWithFlags(version.CheckpointParams(), cmd)) + }, +} + +func runCmd(cmd *cobra.Command, args []string) error { + params.Namespace, _ = cmd.Parent().Flags().GetString("namespace") + params.Name = args[0] + + cliRunner := &runner.CLIRunner{} + fluxClient := flux.New(cliRunner) + logger := logger.New(os.Stdout) + kubeClient, err := kube.NewKubeHTTPClient() + if err != nil { + return fmt.Errorf("error initializing kube client: %w", err) + } + + appService := app.New(logger, nil, fluxClient, kubeClient) + + if err := appService.Unpause(params); err != nil { + return errors.Wrapf(err, "failed to unpause the app %s", params.Name) + } + + return nil +} diff --git a/pkg/flux/flux.go b/pkg/flux/flux.go index c538e8d372..6293a1b671 100644 --- a/pkg/flux/flux.go +++ b/pkg/flux/flux.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/pkg/errors" + wego "github.com/weaveworks/weave-gitops/api/v1alpha1" "github.com/weaveworks/weave-gitops/pkg/runner" "github.com/weaveworks/weave-gitops/pkg/version" ) @@ -25,6 +26,7 @@ type Flux interface { CreateSecretGit(name string, url string, namespace string) ([]byte, error) GetVersion() (string, error) GetAllResourcesStatus(name string, namespace string) ([]byte, error) + SuspendOrResumeApp(pause wego.SuspendActionType, name, namespace, deploymentType string) ([]byte, error) } type FluxClient struct { @@ -252,3 +254,11 @@ func (f *FluxClient) fluxPath() (string, error) { path := fmt.Sprintf("%v/.wego/bin", homeDir) return fmt.Sprintf("%v/flux-%v", path, version.FluxVersion), nil } + +func (f *FluxClient) SuspendOrResumeApp(suspend wego.SuspendActionType, name, namespace string, deploymentType string) ([]byte, error) { + args := []string{ + string(suspend), deploymentType, name, fmt.Sprintf("--namespace=%s", namespace), + } + + return f.runFluxCmd(args...) +} diff --git a/pkg/flux/fluxfakes/fake_flux.go b/pkg/flux/fluxfakes/fake_flux.go index d7f31b12fa..2321ca26d0 100644 --- a/pkg/flux/fluxfakes/fake_flux.go +++ b/pkg/flux/fluxfakes/fake_flux.go @@ -4,6 +4,7 @@ package fluxfakes import ( "sync" + "github.com/weaveworks/weave-gitops/api/v1alpha1" "github.com/weaveworks/weave-gitops/pkg/flux" ) @@ -142,6 +143,22 @@ type FakeFlux struct { result1 []byte result2 error } + SuspendOrResumeAppStub func(v1alpha1.SuspendActionType, string, string, string) ([]byte, error) + suspendOrResumeAppMutex sync.RWMutex + suspendOrResumeAppArgsForCall []struct { + arg1 v1alpha1.SuspendActionType + arg2 string + arg3 string + arg4 string + } + suspendOrResumeAppReturns struct { + result1 []byte + result2 error + } + suspendOrResumeAppReturnsOnCall map[int]struct { + result1 []byte + result2 error + } UninstallStub func(string, bool) error uninstallMutex sync.RWMutex uninstallArgsForCall []struct { @@ -744,6 +761,73 @@ func (fake *FakeFlux) InstallReturnsOnCall(i int, result1 []byte, result2 error) }{result1, result2} } +func (fake *FakeFlux) SuspendOrResumeApp(arg1 v1alpha1.SuspendActionType, arg2 string, arg3 string, arg4 string) ([]byte, error) { + fake.suspendOrResumeAppMutex.Lock() + ret, specificReturn := fake.suspendOrResumeAppReturnsOnCall[len(fake.suspendOrResumeAppArgsForCall)] + fake.suspendOrResumeAppArgsForCall = append(fake.suspendOrResumeAppArgsForCall, struct { + arg1 v1alpha1.SuspendActionType + arg2 string + arg3 string + arg4 string + }{arg1, arg2, arg3, arg4}) + stub := fake.SuspendOrResumeAppStub + fakeReturns := fake.suspendOrResumeAppReturns + fake.recordInvocation("SuspendOrResumeApp", []interface{}{arg1, arg2, arg3, arg4}) + fake.suspendOrResumeAppMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeFlux) SuspendOrResumeAppCallCount() int { + fake.suspendOrResumeAppMutex.RLock() + defer fake.suspendOrResumeAppMutex.RUnlock() + return len(fake.suspendOrResumeAppArgsForCall) +} + +func (fake *FakeFlux) SuspendOrResumeAppCalls(stub func(v1alpha1.SuspendActionType, string, string, string) ([]byte, error)) { + fake.suspendOrResumeAppMutex.Lock() + defer fake.suspendOrResumeAppMutex.Unlock() + fake.SuspendOrResumeAppStub = stub +} + +func (fake *FakeFlux) SuspendOrResumeAppArgsForCall(i int) (v1alpha1.SuspendActionType, string, string, string) { + fake.suspendOrResumeAppMutex.RLock() + defer fake.suspendOrResumeAppMutex.RUnlock() + argsForCall := fake.suspendOrResumeAppArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeFlux) SuspendOrResumeAppReturns(result1 []byte, result2 error) { + fake.suspendOrResumeAppMutex.Lock() + defer fake.suspendOrResumeAppMutex.Unlock() + fake.SuspendOrResumeAppStub = nil + fake.suspendOrResumeAppReturns = struct { + result1 []byte + result2 error + }{result1, result2} +} + +func (fake *FakeFlux) SuspendOrResumeAppReturnsOnCall(i int, result1 []byte, result2 error) { + fake.suspendOrResumeAppMutex.Lock() + defer fake.suspendOrResumeAppMutex.Unlock() + fake.SuspendOrResumeAppStub = nil + if fake.suspendOrResumeAppReturnsOnCall == nil { + fake.suspendOrResumeAppReturnsOnCall = make(map[int]struct { + result1 []byte + result2 error + }) + } + fake.suspendOrResumeAppReturnsOnCall[i] = struct { + result1 []byte + result2 error + }{result1, result2} +} + func (fake *FakeFlux) Uninstall(arg1 string, arg2 bool) error { fake.uninstallMutex.Lock() ret, specificReturn := fake.uninstallReturnsOnCall[len(fake.uninstallArgsForCall)] @@ -827,6 +911,8 @@ func (fake *FakeFlux) Invocations() map[string][][]interface{} { defer fake.getVersionMutex.RUnlock() fake.installMutex.RLock() defer fake.installMutex.RUnlock() + fake.suspendOrResumeAppMutex.RLock() + defer fake.suspendOrResumeAppMutex.RUnlock() fake.uninstallMutex.RLock() defer fake.uninstallMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/pkg/services/app/app.go b/pkg/services/app/app.go index 39acbf0496..f82a804ad4 100644 --- a/pkg/services/app/app.go +++ b/pkg/services/app/app.go @@ -1,8 +1,11 @@ package app import ( + "context" "fmt" + helmv2 "github.com/fluxcd/helm-controller/api/v2beta1" + kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta1" wego "github.com/weaveworks/weave-gitops/api/v1alpha1" "github.com/weaveworks/weave-gitops/pkg/flux" "github.com/weaveworks/weave-gitops/pkg/git" @@ -10,6 +13,7 @@ import ( "github.com/weaveworks/weave-gitops/pkg/kube" "github.com/weaveworks/weave-gitops/pkg/logger" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) // AppService entity that manages applications @@ -20,6 +24,10 @@ type AppService interface { Get(name types.NamespacedName) (*wego.Application, error) // Status returns flux resources status and the last successful reconciliation time Status(params StatusParams) (string, string, error) + // Pause pauses the gitops automation for an app + Pause(params PauseParams) error + // Unpause resumes the gitops automation for an app + Unpause(params UnpauseParams) error } type App struct { @@ -54,3 +62,87 @@ func createGitProvider(token string) (gitproviders.GitProvider, error) { return provider, nil } + +func (a *App) getDeploymentType(ctx context.Context, name string, namespace string) (wego.DeploymentType, error) { + app, err := a.kube.GetApplication(ctx, types.NamespacedName{Name: name, Namespace: namespace}) + if err != nil { + return wego.DeploymentTypeKustomize, err + } + + return wego.DeploymentType(app.Spec.DeploymentType), nil +} + +func (a *App) getSuspendedStatus(ctx context.Context, name, namespace string, deploymentType wego.DeploymentType) (bool, error) { + var automation client.Object + + switch deploymentType { + case wego.DeploymentTypeKustomize: + automation = &kustomizev1.Kustomization{} + case wego.DeploymentTypeHelm: + automation = &helmv2.HelmRelease{} + default: + return false, fmt.Errorf("invalid deployment type: %v", deploymentType) + } + + if err := a.kube.GetResource(ctx, types.NamespacedName{Namespace: namespace, Name: name}, automation); err != nil { + return false, err + } + + suspendStatus := false + + switch at := automation.(type) { + case *kustomizev1.Kustomization: + suspendStatus = at.Spec.Suspend + case *helmv2.HelmRelease: + suspendStatus = at.Spec.Suspend + } + return suspendStatus, nil +} + +func (a *App) pauseOrUnpause(suspendAction wego.SuspendActionType, name, namespace string) error { + ctx := context.Background() + deploymentType, err := a.getDeploymentType(ctx, name, namespace) + if err != nil { + return fmt.Errorf("unable to determine deployment type for %s: %s", name, err) + } + + suspendStatus, err := a.getSuspendedStatus(ctx, name, namespace, deploymentType) + if err != nil { + return fmt.Errorf("failed to get suspended status: %s", err) + } + + switch deploymentType { + case wego.DeploymentTypeKustomize: + deploymentType = "kustomization" + case wego.DeploymentTypeHelm: + deploymentType = "helmrelease" + default: + return fmt.Errorf("invalid deployment type: %v", deploymentType) + } + + switch suspendAction { + case wego.SuspendAction: + if suspendStatus { + a.logger.Printf("app %s is already paused\n", name) + return nil + } + out, err := a.flux.SuspendOrResumeApp(suspendAction, name, namespace, string(deploymentType)) + if err != nil { + return fmt.Errorf("unable to pause %s err: %s", name, err) + } + a.logger.Printf("%s\n gitops automation paused for %s\n", string(out), name) + return nil + case wego.ResumeAction: + if !suspendStatus { + a.logger.Printf("app %s is already reconciling\n", name) + return nil + } + out, err := a.flux.SuspendOrResumeApp(suspendAction, name, namespace, string(deploymentType)) + if err != nil { + return fmt.Errorf("unable to unpause %s err: %s", name, err) + } + a.logger.Printf("%s\n gitops automation unpaused for %s\n", string(out), name) + return nil + } + return fmt.Errorf("invalid suspend action") +} diff --git a/pkg/services/app/pause.go b/pkg/services/app/pause.go new file mode 100644 index 0000000000..2a8c4b4e8d --- /dev/null +++ b/pkg/services/app/pause.go @@ -0,0 +1,12 @@ +package app + +import wego "github.com/weaveworks/weave-gitops/api/v1alpha1" + +type PauseParams struct { + Name string + Namespace string +} + +func (a *App) Pause(params PauseParams) error { + return a.pauseOrUnpause(wego.SuspendAction, params.Name, params.Namespace) +} diff --git a/pkg/services/app/status.go b/pkg/services/app/status.go index 5baf3bc1c4..8ca3a1a9c4 100644 --- a/pkg/services/app/status.go +++ b/pkg/services/app/status.go @@ -24,7 +24,7 @@ func (a *App) Status(params StatusParams) (string, string, error) { } ctx := context.Background() - deploymentType, err := a.getDeploymentType(ctx, params) + deploymentType, err := a.getDeploymentType(ctx, params.Name, params.Namespace) if err != nil { return "", "", fmt.Errorf("failed getting app deployment type: %w", err) } @@ -37,15 +37,6 @@ func (a *App) Status(params StatusParams) (string, string, error) { return string(fluxOutput), lastRecon, nil } -func (a *App) getDeploymentType(ctx context.Context, params StatusParams) (wego.DeploymentType, error) { - app, err := a.kube.GetApplication(ctx, types.NamespacedName{Name: params.Name, Namespace: params.Namespace}) - if err != nil { - return wego.DeploymentTypeKustomize, err - } - - return wego.DeploymentType(app.Spec.DeploymentType), nil -} - func (a *App) getLastSuccessfulReconciliation(ctx context.Context, deploymentType wego.DeploymentType, params StatusParams) (string, error) { conditions := []metav1.Condition{} switch deploymentType { diff --git a/pkg/services/app/unpause.go b/pkg/services/app/unpause.go new file mode 100644 index 0000000000..ccceff8654 --- /dev/null +++ b/pkg/services/app/unpause.go @@ -0,0 +1,12 @@ +package app + +import wego "github.com/weaveworks/weave-gitops/api/v1alpha1" + +type UnpauseParams struct { + Name string + Namespace string +} + +func (a *App) Unpause(params UnpauseParams) error { + return a.pauseOrUnpause(wego.ResumeAction, params.Name, params.Namespace) +}