Skip to content

Commit

Permalink
pkg/k8s: replace GetNode instances with a k8s store
Browse files Browse the repository at this point in the history
Polling the k8s node from kube-apiserver to retrieve its state has some
scalability concerns, specially in large clutser. To avoid these
concerns we will start using a k8s node cache which will receive the
node events as they come and will have the most up to date node resource
in its local cache.

As the slim Node structure does not contain all the fields from Core V1
Node, which are essential in case we want to perform an Update of this
retrieved Node into kube-apiserver, this commit switches the usage of
slim Node to kubernetes' core v1.Node. It is safe to perform this
switch since the K8s Node watcher is only watching for a single node,
itself.

Signed-off-by: André Martins <[email protected]>
  • Loading branch information
aanm committed Jul 1, 2021
1 parent b89118b commit a8b23e7
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 71 deletions.
2 changes: 1 addition & 1 deletion cilium/cmd/preflight_identity_crd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func initK8s(ctx context.Context) (crdBackend allocator.Backend, crdAllocator *a
log.WithError(err).Fatal("Unable to connect to Kubernetes apiserver")
}

if err := k8s.WaitForNodeInformation(); err != nil {
if err := k8s.WaitForNodeInformation(ctx, k8s.Client()); err != nil {
log.WithError(err).Fatal("Unable to connect to get node spec from apiserver")
}

Expand Down
14 changes: 11 additions & 3 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
d.egressPolicyManager,
option.Config,
)
nd.RegisterK8sNodeGetter(d.k8sWatcher)

d.redirectPolicyManager.RegisterSvcCache(&d.k8sWatcher.K8sSvcCache)
d.redirectPolicyManager.RegisterGetStores(d.k8sWatcher)
Expand Down Expand Up @@ -565,13 +566,20 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
return nil, restoredEndpoints, err
}

// Launch the K8s node watcher so we can start receiving node events.
// Launching the k8s node watcher at this stage will prevent all agents
// from performing Gets directly into kube-apiserver to get the most up
// to date version of the k8s node. This allows for better scalability
// in large clusters.
d.k8sWatcher.NodesInit(k8s.Client())

if option.Config.IPAM == ipamOption.IPAMClusterPool {
// Create the CiliumNode custom resource. This call will block until
// the custom resource has been created
d.nodeDiscovery.UpdateCiliumNodeResource()
}

if err := k8s.WaitForNodeInformation(); err != nil {
if err := k8s.WaitForNodeInformation(d.ctx, d.k8sWatcher); err != nil {
log.WithError(err).Fatal("Unable to connect to get node spec from apiserver")
}

Expand Down Expand Up @@ -606,10 +614,10 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
handleNativeDevices(isKubeProxyReplacementStrict)
finishKubeProxyReplacementInit(isKubeProxyReplacementStrict)

// Launch the K8s watchers in parallel as we continue to process other
// daemon options.
if k8s.IsEnabled() {
bootstrapStats.k8sInit.Start()
// Launch the K8s watchers in parallel as we continue to process other
// daemon options.
d.k8sCachesSynced = d.k8sWatcher.InitK8sSubsystem(d.ctx)
bootstrapStats.k8sInit.End(true)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bgp/speaker/speaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *Speaker) OnUpdateEndpoints(eps *slim_corev1.Endpoints) {
}

// OnUpdateNode notifies the Speaker of an update to a node.
func (s *Speaker) OnUpdateNode(node *slim_corev1.Node) {
func (s *Speaker) OnUpdateNode(node *v1.Node) {
s.queue.Add(nodeEvent(&node.Labels))
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/k8s/annotate.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2020 Authors of Cilium
// Copyright 2016-2021 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cilium/cilium/pkg/logging/logfields"

"github.com/sirupsen/logrus"
core_v1 "k8s.io/api/core/v1"
apiextclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -137,3 +138,12 @@ func (k8sCli K8sClient) GetSecrets(ctx context.Context, ns, name string) (map[st
}
return result.Data, nil
}

// GetK8sNode returns the node with the given nodeName.
func (k8sCli K8sClient) GetK8sNode(ctx context.Context, nodeName string) (*core_v1.Node, error) {
if k8sCli.Interface == nil {
return nil, fmt.Errorf("GetK8sNode: No k8s, cannot access k8s nodes")
}

return k8sCli.CoreV1().Nodes().Get(ctx, nodeName, v1.GetOptions{})
}
6 changes: 3 additions & 3 deletions pkg/k8s/factory_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func ObjTov1Pod(obj interface{}) *slim_corev1.Pod {
return nil
}

func ObjToV1Node(obj interface{}) *slim_corev1.Node {
node, ok := obj.(*slim_corev1.Node)
func ObjToV1Node(obj interface{}) *v1.Node {
node, ok := obj.(*v1.Node)
if ok {
return node
}
Expand All @@ -181,7 +181,7 @@ func ObjToV1Node(obj interface{}) *slim_corev1.Node {
// Delete was not observed by the watcher but is
// removed from kube-apiserver. This is the last
// known state and the object no longer exists.
node, ok := deletedObj.Obj.(*slim_corev1.Node)
node, ok := deletedObj.Obj.(*v1.Node)
if ok {
return node
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/k8s/informer/benchmarks/informer_benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 Authors of Cilium
// Copyright 2019-2021 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package benchmarks
import (
"encoding/json"
"os"
"reflect"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -460,7 +461,7 @@ func (k *K8sIntegrationSuite) benchmarkInformer(nCycles int, newInformer bool, c
UpdateFunc: func(oldObj, newObj interface{}) {
if oldK8sNP := k8s.ObjToV1Node(oldObj); oldK8sNP != nil {
if newK8sNP := k8s.ObjToV1Node(newObj); newK8sNP != nil {
if oldK8sNP.DeepEqual(newK8sNP) {
if reflect.DeepEqual(oldK8sNP, newK8sNP) {
return
}
}
Expand Down
24 changes: 15 additions & 9 deletions pkg/k8s/init.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2020 Authors of Cilium
// Copyright 2016-2021 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,14 +34,19 @@ import (
"github.com/cilium/cilium/pkg/source"

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
nodeRetrievalMaxRetries = 15
)

func waitForNodeInformation(ctx context.Context, nodeName string) *nodeTypes.Node {
type nodeGetter interface {
GetK8sNode(ctx context.Context, nodeName string) (*corev1.Node, error)
}

func waitForNodeInformation(ctx context.Context, nodeGetter nodeGetter, nodeName string) *nodeTypes.Node {
backoff := backoff.Exponential{
Min: time.Duration(200) * time.Millisecond,
Max: 2 * time.Minute,
Expand All @@ -50,7 +55,7 @@ func waitForNodeInformation(ctx context.Context, nodeName string) *nodeTypes.Nod
}

for retry := 0; retry < nodeRetrievalMaxRetries; retry++ {
n, err := retrieveNodeInformation(nodeName)
n, err := retrieveNodeInformation(ctx, nodeGetter, nodeName)
if err != nil {
log.WithError(err).Warning("Waiting for k8s node information")
backoff.Wait(ctx)
Expand All @@ -63,7 +68,7 @@ func waitForNodeInformation(ctx context.Context, nodeName string) *nodeTypes.Nod
return nil
}

func retrieveNodeInformation(nodeName string) (*nodeTypes.Node, error) {
func retrieveNodeInformation(ctx context.Context, nodeGetter nodeGetter, nodeName string) (*nodeTypes.Node, error) {
requireIPv4CIDR := option.Config.K8sRequireIPv4PodCIDR
requireIPv6CIDR := option.Config.K8sRequireIPv6PodCIDR
// At this point it's not clear whether the device auto-detection will
Expand All @@ -74,7 +79,7 @@ func retrieveNodeInformation(nodeName string) (*nodeTypes.Node, error) {
var n *nodeTypes.Node

if option.Config.IPAM == ipamOption.IPAMClusterPool {
ciliumNode, err := CiliumClient().CiliumV2().CiliumNodes().Get(context.TODO(), nodeName, v1.GetOptions{})
ciliumNode, err := CiliumClient().CiliumV2().CiliumNodes().Get(ctx, nodeName, v1.GetOptions{})
if err != nil {
// If no CIDR is required, retrieving the node information is
// optional
Expand All @@ -89,7 +94,7 @@ func retrieveNodeInformation(nodeName string) (*nodeTypes.Node, error) {
n = &no
log.WithField(logfields.NodeName, n.Name).Info("Retrieved node information from cilium node")
} else {
k8sNode, err := GetNode(Client(), nodeName)
k8sNode, err := nodeGetter.GetK8sNode(ctx, nodeName)
if err != nil {
// If no CIDR is required, retrieving the node information is
// optional
Expand Down Expand Up @@ -195,8 +200,9 @@ func Init(conf k8sconfig.Configuration) error {

// WaitForNodeInformation retrieves the node information via the CiliumNode or
// Kubernetes Node resource. This function will block until the information is
// received.
func WaitForNodeInformation() error {
// received. nodeGetter is a function used to retrieved the node from either
// the kube-apiserver or a local cache, depending on the caller.
func WaitForNodeInformation(ctx context.Context, nodeGetter nodeGetter) error {
// Use of the environment variable overwrites the node-name
// automatically derived
nodeName := nodeTypes.GetName()
Expand All @@ -210,7 +216,7 @@ func WaitForNodeInformation() error {
return nil
}

if n := waitForNodeInformation(context.TODO(), nodeName); n != nil {
if n := waitForNodeInformation(ctx, nodeGetter, nodeName); n != nil {
nodeIP4 := n.GetNodeIP(false)
nodeIP6 := n.GetNodeIP(true)

Expand Down
7 changes: 0 additions & 7 deletions pkg/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,6 @@ func ParseNode(k8sNode *slim_corev1.Node, source source.Source) *nodeTypes.Node
return newNode
}

// GetNode returns the kubernetes nodeName's node information from the
// kubernetes api server
func GetNode(c kubernetes.Interface, nodeName string) (*corev1.Node, error) {
// Try to retrieve node's cidr and addresses from k8s's configuration
return c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
}

// setNodeNetworkUnavailableFalse sets Kubernetes NodeNetworkUnavailable to
// false as Cilium is managing the network connectivity.
// https://kubernetes.io/docs/concepts/architecture/nodes/#condition
Expand Down
116 changes: 77 additions & 39 deletions pkg/k8s/watchers/node.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 Authors of Cilium
// Copyright 2020-2021 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,64 +15,81 @@
package watchers

import (
"context"
"sync"

"github.com/cilium/cilium/pkg/comparator"
"github.com/cilium/cilium/pkg/k8s"
"github.com/cilium/cilium/pkg/k8s/informer"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/node"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"

v1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

func (k *K8sWatcher) nodesInit(k8sClient kubernetes.Interface) {
_, nodeController := informer.NewInformer(
cache.NewListWatchFromClient(k8sClient.CoreV1().RESTClient(),
"nodes", v1.NamespaceAll, fields.ParseSelectorOrDie("metadata.name="+nodeTypes.GetName())),
&slim_corev1.Node{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
var valid bool
if node := k8s.ObjToV1Node(obj); node != nil {
valid = true
err := k.updateK8sNodeV1(nil, node)
k.K8sEventProcessed(metricNode, metricCreate, err == nil)
}
k.K8sEventReceived(metricNode, metricCreate, valid, false)
},
UpdateFunc: func(oldObj, newObj interface{}) {
var valid, equal bool
if oldNode := k8s.ObjToV1Node(oldObj); oldNode != nil {
valid = true
if newNode := k8s.ObjToV1Node(newObj); newNode != nil {
oldNodeLabels := oldNode.GetLabels()
newNodeLabels := newNode.GetLabels()
if comparator.MapStringEquals(oldNodeLabels, newNodeLabels) {
equal = true
} else {
err := k.updateK8sNodeV1(oldNode, newNode)
k.K8sEventProcessed(metricNode, metricUpdate, err == nil)
var (
// onceNodeInitStart is used to guarantee that only one function call of
// NodesInit is executed.
onceNodeInitStart sync.Once
)

// NodesInit initializes a k8s watcher for node events belonging to the node
// where Cilium is running.
func (k *K8sWatcher) NodesInit(k8sClient kubernetes.Interface) {
onceNodeInitStart.Do(func() {
nodeStore, nodeController := informer.NewInformer(
cache.NewListWatchFromClient(k8sClient.CoreV1().RESTClient(),
"nodes", v1.NamespaceAll, fields.ParseSelectorOrDie("metadata.name="+nodeTypes.GetName())),
&v1.Node{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
var valid bool
if node := k8s.ObjToV1Node(obj); node != nil {
valid = true
err := k.updateK8sNodeV1(nil, node)
k.K8sEventProcessed(metricNode, metricCreate, err == nil)
}
k.K8sEventReceived(metricNode, metricCreate, valid, false)
},
UpdateFunc: func(oldObj, newObj interface{}) {
var valid, equal bool
if oldNode := k8s.ObjToV1Node(oldObj); oldNode != nil {
valid = true
if newNode := k8s.ObjToV1Node(newObj); newNode != nil {
oldNodeLabels := oldNode.GetLabels()
newNodeLabels := newNode.GetLabels()
if comparator.MapStringEquals(oldNodeLabels, newNodeLabels) {
equal = true
} else {
err := k.updateK8sNodeV1(oldNode, newNode)
k.K8sEventProcessed(metricNode, metricUpdate, err == nil)
}
}
}
}
k.K8sEventReceived(metricNode, metricUpdate, valid, equal)
k.K8sEventReceived(metricNode, metricUpdate, valid, equal)
},
},
},
nil,
)
nil,
)

k.nodeStore = nodeStore

k.blockWaitGroupToSyncResources(wait.NeverStop, nil, nodeController.HasSynced, k8sAPIGroupNodeV1Core)
go nodeController.Run(wait.NeverStop)
k.k8sAPIGroups.AddAPI(k8sAPIGroupNodeV1Core)
k.blockWaitGroupToSyncResources(wait.NeverStop, nil, nodeController.HasSynced, k8sAPIGroupNodeV1Core)
go nodeController.Run(wait.NeverStop)
k.k8sAPIGroups.AddAPI(k8sAPIGroupNodeV1Core)
})
}

func (k *K8sWatcher) updateK8sNodeV1(oldK8sNode, newK8sNode *slim_corev1.Node) error {
func (k *K8sWatcher) updateK8sNodeV1(oldK8sNode, newK8sNode *v1.Node) error {
var oldNodeLabels map[string]string
if oldK8sNode != nil {
oldNodeLabels = oldK8sNode.GetLabels()
Expand All @@ -96,3 +113,24 @@ func (k *K8sWatcher) updateK8sNodeV1(oldK8sNode, newK8sNode *slim_corev1.Node) e
}
return nil
}

// GetK8sNode returns the *local Node* from the local store.
func (k *K8sWatcher) GetK8sNode(_ context.Context, nodeName string) (*v1.Node, error) {
k.WaitForCacheSync(k8sAPIGroupNodeV1Core)
pName := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
}
nodeInterface, exists, err := k.nodeStore.Get(pName)
if err != nil {
return nil, err
}
if !exists {
return nil, k8sErrors.NewNotFound(schema.GroupResource{
Group: "core",
Resource: "Node",
}, nodeName)
}
return nodeInterface.(*v1.Node).DeepCopy(), nil
}
Loading

0 comments on commit a8b23e7

Please sign in to comment.