Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated backport of #3222: Allow the gateway to wait for node readiness #3225

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/gateway"
"github.com/submariner-io/submariner/pkg/natdiscovery"
"github.com/submariner-io/submariner/pkg/node"
"github.com/submariner-io/submariner/pkg/types"
"github.com/submariner-io/submariner/pkg/versions"
"golang.org/x/net/http/httpproxy"
Expand Down Expand Up @@ -123,6 +124,14 @@ func main() {

logger.FatalOnError(subv1.AddToScheme(scheme.Scheme), "Error adding submariner types to the scheme")

ctx := signals.SetupSignalHandler()

if submSpec.WaitForNode {
node.WaitForLocalNodeReady(ctx, k8sClient)

return
}

gw, err := gateway.New(&gateway.Config{
LeaderElectionConfig: gateway.LeaderElectionConfig{
LeaseDuration: time.Duration(gwLeadershipConfig.LeaseDuration) * time.Second,
Expand All @@ -146,7 +155,7 @@ func main() {
})
logger.FatalOnError(err, "Error creating gateway instance")

err = gw.Run(signals.SetupSignalHandler())
err = gw.Run(ctx)

logger.FatalOnError(err, "Error running the gateway")
}
30 changes: 28 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ import (

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/resource"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
nodeutil "k8s.io/component-helpers/node/util"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

var logger = log.Logger{Logger: logf.Log.WithName("Node")}

var nodeRetry = wait.Backoff{
var Retry = wait.Backoff{
Steps: 5,
Duration: 5 * time.Second,
Factor: 1.2,
Expand All @@ -50,7 +52,7 @@ func GetLocalNode(clientset kubernetes.Interface) (*v1.Node, error) {

var node *v1.Node

err := retry.OnError(nodeRetry, func(err error) bool {
err := retry.OnError(Retry, func(err error) bool {
logger.Warningf("Error reading the local node - retrying: %v", err)
return true
}, func() error {
Expand All @@ -66,3 +68,27 @@ func GetLocalNode(clientset kubernetes.Interface) (*v1.Node, error) {

return node, errors.Wrapf(err, "failed to get local node %q", nodeName)
}

func WaitForLocalNodeReady(ctx context.Context, client kubernetes.Interface) {
// In most cases the node will already be ready; otherwise, wait forever or until the context is cancelled.
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (bool, error) {
localNode, err := GetLocalNode(client) //nolint:contextcheck // TODO - should pass the context parameter

if err != nil {
logger.Error(err, "Error retrieving local node")
} else {
_, condition := nodeutil.GetNodeCondition(&localNode.Status, v1.NodeReady)
if condition != nil && condition.Status == v1.ConditionTrue {
logger.Info("Local node ready")
return true, nil
}

logger.Infof("Local node not ready - waiting. Conditions: %s", resource.ToJSON(localNode.Status.Conditions))
}

return false, nil
})
if err != nil {
logger.Error(err, "Error waiting for local node")
}
}
40 changes: 40 additions & 0 deletions pkg/node/node_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
SPDX-License-Identifier: Apache-2.0

Copyright Contributors to the Submariner project.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package node_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/log/kzerolog"
)

func init() {
kzerolog.AddFlags(nil)
}

var _ = BeforeSuite(func() {
kzerolog.InitK8sLogging()
})

func TestNode(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Node Suite")
}
157 changes: 157 additions & 0 deletions pkg/node/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
SPDX-License-Identifier: Apache-2.0

Copyright Contributors to the Submariner project.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package node_test

import (
"context"
"os"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/fake"
"github.com/submariner-io/submariner/pkg/node"
corev1 "k8s.io/api/core/v1"
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
fakeK8s "k8s.io/client-go/kubernetes/fake"
nodeutil "k8s.io/component-helpers/node/util"
)

const localNodeName = "local-node"

var _ = Describe("GetLocalNode", func() {
t := newTestDriver()

When("the local Node resource exists", func() {
It("should return the resource", func() {
Expect(node.GetLocalNode(t.client)).To(Equal(t.node))
})
})

When("the local Node resource does not exist", func() {
BeforeEach(func() {
t.initialObjs = []runtime.Object{}
})

It("should return an error", func() {
_, err := node.GetLocalNode(t.client)
Expect(err).To(HaveOccurred())
})
})

When("the local Node retrieval initially fails", func() {
JustBeforeEach(func() {
fake.FailOnAction(&t.client.Fake, "nodes", "get", nil, true)
})

It("should eventually return the resource", func() {
Expect(node.GetLocalNode(t.client)).To(Equal(t.node))
})
})

When("the NODE_NAME env var isn't set", func() {
BeforeEach(func() {
os.Unsetenv("NODE_NAME")
})

It("should return an error", func() {
_, err := node.GetLocalNode(t.client)
Expect(err).To(HaveOccurred())
})
})
})

var _ = Describe("WaitForLocalNodeReady", func() {
t := newTestDriver()

var (
cancel context.CancelFunc
completed chan struct{}
)

JustBeforeEach(func() {
var ctx context.Context

ctx, cancel = context.WithCancel(context.Background())
completed = make(chan struct{}, 1)

go func() {
node.WaitForLocalNodeReady(ctx, t.client)
close(completed)
}()

DeferCleanup(cancel)

Consistently(completed).ShouldNot(BeClosed())
})

When("the local Node becomes ready", func() {
It("should return", func() {
Expect(nodeutil.SetNodeCondition(t.client, localNodeName, corev1.NodeCondition{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
})).To(Succeed())

Eventually(completed, 3*time.Second).Should(BeClosed())
})
})

When("the context is cancelled", func() {
It("should return", func() {
cancel()

Eventually(completed, 3*time.Second).Should(BeClosed())
})
})
})

type testDriver struct {
client *fakeK8s.Clientset
node *corev1.Node
initialObjs []runtime.Object
}

func newTestDriver() *testDriver {
t := &testDriver{}

BeforeEach(func() {
node.Retry = wait.Backoff{
Steps: 2,
Duration: 10 * time.Millisecond,
}

t.node = &corev1.Node{
ObjectMeta: v1meta.ObjectMeta{
Name: localNodeName,
},
}

t.initialObjs = []runtime.Object{t.node}

os.Setenv("NODE_NAME", localNodeName)
})

JustBeforeEach(func() {
t.client = fakeK8s.NewSimpleClientset(t.initialObjs...)
})

return t
}
30 changes: 4 additions & 26 deletions pkg/routeagent_driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"io/fs"
"os"
"strconv"
"time"

"github.com/kelseyhightower/envconfig"
"github.com/pkg/errors"
Expand All @@ -49,12 +48,10 @@ import (
"github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/mtu"
"github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/ovn"
"github.com/submariner-io/submariner/pkg/versions"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
nodeutil "k8s.io/component-helpers/node/util"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
Expand Down Expand Up @@ -82,7 +79,7 @@ func main() {

logger.Info("Starting submariner-route-agent using the event framework")
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler().Done()
ctx := signals.SetupSignalHandler()

// Clean up "sockets" created as directories by previous versions
removeInvalidSockets()
Expand Down Expand Up @@ -111,7 +108,7 @@ func main() {
logger.FatalOnError(err, "Error building the REST mapper")

if env.WaitForNode {
waitForNodeReady(k8sClientSet)
node.WaitForLocalNodeReady(ctx, k8sClientSet)

return
}
Expand Down Expand Up @@ -163,6 +160,8 @@ func main() {
})
logger.FatalOnError(err, "Error creating controller for event handling")

stopCh := ctx.Done()

err = ctl.Start(stopCh)
logger.FatalOnError(err, "Error starting controller")

Expand Down Expand Up @@ -211,27 +210,6 @@ func uninstall(registry *event.Registry) {
}
}

func waitForNodeReady(k8sClientSet *kubernetes.Clientset) {
// In most cases the node will already be ready; otherwise, wait for ever
for {
localNode, err := node.GetLocalNode(k8sClientSet)

if err != nil {
logger.Error(err, "Error retrieving local node")
} else if localNode != nil {
_, condition := nodeutil.GetNodeCondition(&localNode.Status, corev1.NodeReady)
if condition != nil && condition.Status == corev1.ConditionTrue {
logger.Info("Node ready")
return
}

logger.Infof("Node not ready, waiting: %v", localNode.Status)
}

time.Sleep(1 * time.Second)
}
}

func removeInvalidSockets() {
// This can be removed once we stop supporting upgrades from 0.16.0 or older
for _, dir := range []string{"/run/openvswitch/db.sock", "/var/run/openvswitch/ovnnb_db.sock", "/var/run/ovn-ic/ovnnb_db.sock"} {
Expand Down
1 change: 1 addition & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type SubmarinerSpecification struct {
HealthCheckEnabled bool `default:"true"`
Uninstall bool
HaltOnCertError bool `split_words:"true"`
WaitForNode bool
HealthCheckInterval uint
HealthCheckMaxPacketLossCount uint
MetricsPort int `default:"32780"`
Expand Down
Loading