Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: helm concurrent deploys with cross dependency #9588

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs-v2/content/en/schemas/v4beta12.json
Original file line number Diff line number Diff line change
Expand Up @@ -2369,6 +2369,15 @@
"description": "if `true`, Skaffold will send `--create-namespace` flag to Helm CLI. `--create-namespace` flag is available in Helm since version 3.2. Defaults is `false`.",
"x-intellij-html-description": "if <code>true</code>, Skaffold will send <code>--create-namespace</code> flag to Helm CLI. <code>--create-namespace</code> flag is available in Helm since version 3.2. Defaults is <code>false</code>."
},
"dependsOn": {
"items": {
"type": "string"
},
"type": "array",
"description": "specifies other helm packages that this HelmRelease depends on, ensuring proper ordering during deployment.",
"x-intellij-html-description": "specifies other helm packages that this HelmRelease depends on, ensuring proper ordering during deployment.",
"default": "[]"
},
"name": {
"type": "string",
"description": "name of the Helm release. It accepts environment variables via the go template syntax.",
Expand Down Expand Up @@ -2484,7 +2493,8 @@
"repo",
"upgradeOnChange",
"overrides",
"packaged"
"packaged",
"dependsOn"
],
"additionalProperties": false,
"type": "object",
Expand Down
58 changes: 58 additions & 0 deletions pkg/skaffold/deploy/helm/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ func (h *Deployer) Deploy(ctx context.Context, out io.Writer, builds []graph.Art

olog.Entry(ctx).Infof("Deploying with helm v%s ...", h.bV)

releaseChannels := make(map[string]chan bool)

nsMap := map[string]struct{}{}
manifests := manifest.ManifestList{}
g, ctx := errgroup.WithContext(ctx)
Expand All @@ -271,6 +273,16 @@ func (h *Deployer) Deploy(ctx context.Context, out io.Writer, builds []graph.Art
} else {
g.SetLimit(*h.Concurrency)
olog.Entry(ctx).Infof("Installing %d releases concurrently", len(h.Releases))

for _, r := range h.Releases {
// Check if there is any process that depends on this deployment
if h.hasDependentReleases(r.Name) {
if _, ok := releaseChannels[r.Name]; !ok {
olog.Entry(ctx).Debugf("Helm concurrent deployment channel created %q", r.Name)
releaseChannels[r.Name] = make(chan bool)
}
}
}
}

var mu sync2.Mutex
Expand All @@ -295,8 +307,33 @@ func (h *Deployer) Deploy(ctx context.Context, out io.Writer, builds []graph.Art
return helm.UserErr(fmt.Sprintf("cannot expand chart path %q", r.ChartPath), err)
}

for _, dependency := range r.DependsOn {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about nested dependencies?
ServiceA depends on ServiceB and ServiceB depends on Service C

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not created a test for this case but, in theory, it should work the same way. Here's why:

  • The first action is to check all services that has dependents, then create the channels to use as wait controllers;
  • So, In the case you're suggesting:
    • If ServiceC starts first, it will hit the signal break with channels; It's gonna wait for Service B to continue;
    • If ServiceB starts in second, it will hit the signal break too, and wait for ServiceA
    • ServiceA does not depends on anyone, so it deploys and send the "all clear" signal to his dependents
    • ServiceB get's the "all clear" signal, and starts this deployment. When finished, it will also sends his signal for ServiceC
    • Finally, ServiceC get's his all clear and do this deployment.

One scenario that may happen is a deadlock. I did not think about it on this PR.

Another thing that I'm thinking is that this deserve a integration testing. I'm still learning about this repo and I will create a separated PR for that. This PR already has the label "size/L" 😱

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest you to check this approach #9588 (comment), because the current approach will lead to deadlock(concurrency = 3 and 3 dependencies are waiting, for example).
don't worry about the "size/L", it's ok)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks for the feedback @idsulik . I'll work on that. Now I see the point, it's easy to get to a deadlock situation because of the lack of resources.

dependencyChannel := releaseChannels[dependency]

if dependencyChannel != nil {
olog.Entry(ctx).Infof("Helm deployment %q waiting for %q to finish", r.Name, dependency)

result := <-dependencyChannel

if !result {
return helm.UserErr(fmt.Sprintf("dependency %q failed, %q will not continue", dependency, releaseName), err)
} else {
olog.Entry(ctx).Infof("Helm deployment dependency %q finished. Starting %q now...", dependency, r.Name)
}
} else {
olog.Entry(ctx).Warnf("Helm deployment %q have not found the helm config %q. Check the dependsOn property on your skaffold.yaml file.", r.Name, dependency)
}
}

m, results, err := h.deployRelease(ctx, out, releaseName, r, builds, h.bV, chartVersion, repo)
if err != nil {
// If there is a channel, send a message to fail
c := releaseChannels[r.Name]

if c != nil {
c <- false
close(c)
}
return helm.UserErr(fmt.Sprintf("deploying %q", releaseName), err)
}

Expand All @@ -319,6 +356,14 @@ func (h *Deployer) Deploy(ctx context.Context, out io.Writer, builds []graph.Art
}
mu.Unlock()

// If there is a channel, send the success signal and close it
c := releaseChannels[r.Name]

if c != nil {
c <- true
close(c)
}

return nil
})
}
Expand Down Expand Up @@ -347,6 +392,19 @@ func (h *Deployer) Deploy(ctx context.Context, out io.Writer, builds []graph.Art
return nil
}

func (h *Deployer) hasDependentReleases(releaseName string) bool {
for _, v := range h.Releases {
if len(v.DependsOn) > 0 {
for _, dep := range v.DependsOn {
if dep == releaseName { // If there is someone who depends on me...
return true
}
}
}
}
return false
}

// Dependencies returns a list of files that the deployer depends on.
func (h *Deployer) Dependencies() ([]string, error) {
var deps []string
Expand Down
196 changes: 196 additions & 0 deletions pkg/skaffold/deploy/helm/helm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,34 @@ var testTwoReleases = latest.LegacyHelmDeploy{
}},
}

var testTreeReleasesWithDependencies = latest.LegacyHelmDeploy{
Releases: []latest.HelmRelease{{
Name: "other",
ChartPath: "examples/test",
}, {
Name: "skaffold-helm",
ChartPath: "examples/test",
}, {
Name: "skaffold-helm-remote",
ChartPath: "examples/test",
DependsOn: []string{"skaffold-helm"},
}},
}

var testTreeReleasesWithDependenciesWrongConfig = latest.LegacyHelmDeploy{
Releases: []latest.HelmRelease{{
Name: "other",
ChartPath: "examples/test",
}, {
Name: "skaffold-helm",
ChartPath: "examples/test",
}, {
Name: "skaffold-helm-remote",
ChartPath: "examples/test",
DependsOn: []string{"skaffold-helm-v2"},
}},
}

var createNamespaceFlag = true
var testDeployCreateNamespaceConfig = latest.LegacyHelmDeploy{
Releases: []latest.HelmRelease{{
Expand Down Expand Up @@ -1079,6 +1107,174 @@ func TestHelmDeployConcurrently(t *testing.T) {
}
}

func TestHelmDeployConcurrentlyWithCrossDependency(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "TestHelmDeploy")
if err != nil {
t.Fatalf("tempdir: %v", err)
}

tests := []struct {
description string
commands util.Command
env []string
helm latest.LegacyHelmDeploy
namespace string
configure func(*Deployer)
builds []graph.Artifact
force bool
shouldErr bool
expectedWarnings []string
expectedNamespaces []string
}{
{
description: "Deploy with multiple helm packages with dependency within each other",
commands: concurrency.
CmdRunWithOutput("helm version --client", version31).
AndRun("helm --kube-context kubecontext get all other --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext dep build examples/test --kubeconfig kubeconfig").
AndRunEnv("helm --kube-context kubecontext upgrade other examples/test --post-renderer SKAFFOLD-BINARY --kubeconfig kubeconfig",
[]string{"SKAFFOLD_FILENAME=test.yaml", "SKAFFOLD_CMDLINE=filter --kube-context kubecontext --build-artifacts TMPFILE --kubeconfig kubeconfig"}).
AndRun("helm --kube-context kubecontext get all other --template {{.Release.Manifest}} --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext get all skaffold-helm --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext dep build examples/test --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext get all skaffold-helm-remote --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext dep build examples/test --kubeconfig kubeconfig").
AndRunEnv("helm --kube-context kubecontext upgrade skaffold-helm examples/test --post-renderer SKAFFOLD-BINARY --kubeconfig kubeconfig",
[]string{"SKAFFOLD_FILENAME=test.yaml", "SKAFFOLD_CMDLINE=filter --kube-context kubecontext --build-artifacts TMPFILE --kubeconfig kubeconfig"}).
AndRunEnv("helm --kube-context kubecontext upgrade skaffold-helm-remote examples/test --post-renderer SKAFFOLD-BINARY --kubeconfig kubeconfig",
[]string{"SKAFFOLD_FILENAME=test.yaml", "SKAFFOLD_CMDLINE=filter --kube-context kubecontext --build-artifacts TMPFILE --kubeconfig kubeconfig"}).
AndRunWithOutput("helm --kube-context kubecontext get all skaffold-helm --template {{.Release.Manifest}} --kubeconfig kubeconfig", validDeployYaml).
AndRunWithOutput("helm --kube-context kubecontext get all skaffold-helm-remote --template {{.Release.Manifest}} --kubeconfig kubeconfig", validDeployYaml),
helm: testTreeReleasesWithDependencies,
builds: testBuilds,
expectedNamespaces: []string{""},
},
}

concurrencyCount := 3
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&helm.WriteBuildArtifacts, func([]graph.Artifact) (string, func(), error) { return "TMPFILE", func() {}, nil })
t.Override(&client.Client, deployutil.MockK8sClient)
fakeWarner := &warnings.Collect{}
env := test.env
if env == nil {
env = []string{"FOO=FOOBAR"}
}
t.Override(&warnings.Printf, fakeWarner.Warnf)
t.Override(&util.OSEnviron, func() []string { return env })
t.Override(&util.DefaultExecCommand, test.commands)
t.Override(&helm.OSExecutable, func() (string, error) { return "SKAFFOLD-BINARY", nil })
t.Override(&kubectx.CurrentConfig, func() (api.Config, error) {
return api.Config{CurrentContext: ""}, nil
})

test.helm.Concurrency = &concurrencyCount
deployer, err := NewDeployer(context.Background(), &helmConfig{
namespace: test.namespace,
force: test.force,
configFile: "test.yaml",
}, &label.DefaultLabeller{}, &test.helm, nil, "default", nil)
t.RequireNoError(err)

if test.configure != nil {
test.configure(deployer)
}
deployer.pkgTmpDir = tmpDir
// Deploy returns nil unless `helm get all <release>` is set up to return actual release info
err = deployer.Deploy(context.Background(), io.Discard, test.builds, manifest.ManifestListByConfig{})
t.CheckError(test.shouldErr, err)
t.CheckDeepEqual(test.expectedWarnings, fakeWarner.Warnings)
t.CheckErrorAndDeepEqual(test.shouldErr, err, test.expectedNamespaces, *deployer.namespaces)
})
}
}

func TestHelmDeployConcurrentlyWithCrossDependencyWrongConfig(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "TestHelmDeploy")
if err != nil {
t.Fatalf("tempdir: %v", err)
}

tests := []struct {
description string
commands util.Command
env []string
helm latest.LegacyHelmDeploy
namespace string
configure func(*Deployer)
builds []graph.Artifact
force bool
shouldErr bool
expectedWarnings []string
expectedNamespaces []string
}{
{
description: "Deploy with multiple helm packages with dependency within each other",
commands: concurrency.
CmdRunWithOutput("helm version --client", version31).
AndRun("helm --kube-context kubecontext get all other --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext dep build examples/test --kubeconfig kubeconfig").
AndRunEnv("helm --kube-context kubecontext upgrade other examples/test --post-renderer SKAFFOLD-BINARY --kubeconfig kubeconfig",
[]string{"SKAFFOLD_FILENAME=test.yaml", "SKAFFOLD_CMDLINE=filter --kube-context kubecontext --build-artifacts TMPFILE --kubeconfig kubeconfig"}).
AndRun("helm --kube-context kubecontext get all other --template {{.Release.Manifest}} --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext get all skaffold-helm --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext dep build examples/test --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext get all skaffold-helm-remote --kubeconfig kubeconfig").
AndRun("helm --kube-context kubecontext dep build examples/test --kubeconfig kubeconfig").
AndRunEnv("helm --kube-context kubecontext upgrade skaffold-helm examples/test --post-renderer SKAFFOLD-BINARY --kubeconfig kubeconfig",
[]string{"SKAFFOLD_FILENAME=test.yaml", "SKAFFOLD_CMDLINE=filter --kube-context kubecontext --build-artifacts TMPFILE --kubeconfig kubeconfig"}).
AndRunEnv("helm --kube-context kubecontext upgrade skaffold-helm-remote examples/test --post-renderer SKAFFOLD-BINARY --kubeconfig kubeconfig",
[]string{"SKAFFOLD_FILENAME=test.yaml", "SKAFFOLD_CMDLINE=filter --kube-context kubecontext --build-artifacts TMPFILE --kubeconfig kubeconfig"}).
AndRunWithOutput("helm --kube-context kubecontext get all skaffold-helm --template {{.Release.Manifest}} --kubeconfig kubeconfig", validDeployYaml).
AndRunWithOutput("helm --kube-context kubecontext get all skaffold-helm-remote --template {{.Release.Manifest}} --kubeconfig kubeconfig", validDeployYaml),
helm: testTreeReleasesWithDependenciesWrongConfig,
builds: testBuilds,
expectedNamespaces: []string{""},
},
}

concurrencyCount := 3
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&helm.WriteBuildArtifacts, func([]graph.Artifact) (string, func(), error) { return "TMPFILE", func() {}, nil })
t.Override(&client.Client, deployutil.MockK8sClient)
fakeWarner := &warnings.Collect{}
env := test.env
if env == nil {
env = []string{"FOO=FOOBAR"}
}
t.Override(&warnings.Printf, fakeWarner.Warnf)
t.Override(&util.OSEnviron, func() []string { return env })
t.Override(&util.DefaultExecCommand, test.commands)
t.Override(&helm.OSExecutable, func() (string, error) { return "SKAFFOLD-BINARY", nil })
t.Override(&kubectx.CurrentConfig, func() (api.Config, error) {
return api.Config{CurrentContext: ""}, nil
})

test.helm.Concurrency = &concurrencyCount
deployer, err := NewDeployer(context.Background(), &helmConfig{
namespace: test.namespace,
force: test.force,
configFile: "test.yaml",
}, &label.DefaultLabeller{}, &test.helm, nil, "default", nil)
t.RequireNoError(err)

if test.configure != nil {
test.configure(deployer)
}
deployer.pkgTmpDir = tmpDir
// Deploy returns nil unless `helm get all <release>` is set up to return actual release info
err = deployer.Deploy(context.Background(), io.Discard, test.builds, manifest.ManifestListByConfig{})
t.CheckError(test.shouldErr, err)
t.CheckDeepEqual(test.expectedWarnings, fakeWarner.Warnings)
t.CheckErrorAndDeepEqual(test.shouldErr, err, test.expectedNamespaces, *deployer.namespaces)
})
}
}

// testTreeReleasesWithDependenciesWrongConfig

func TestHelmCleanup(t *testing.T) {
tests := []struct {
description string
Expand Down
3 changes: 3 additions & 0 deletions pkg/skaffold/schema/latest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,9 @@ type HelmRelease struct {

// Packaged parameters for packaging helm chart (`helm package`).
Packaged *HelmPackaged `yaml:"packaged,omitempty"`

// DependsOn specifies other helm packages that this HelmRelease depends on, ensuring proper ordering during deployment.
DependsOn []string `yaml:"dependsOn,omitempty"`
}

// HelmPackaged parameters for packaging helm chart (`helm package`).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (c *FakeCmdWithConcurrencySupport) RunCmd(_ context.Context, cmd *exec.Cmd)

r, err := c.popRunWithGivenCommand(command)
if err != nil {
c.t.Fatalf("unable to run RunCmd() with command %q", command)
c.t.Fatalf("unable to run RunCmd() with command %q: %q", command, err)
}

if r.output != nil {
Expand Down
Loading