Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

calico blockaffinity informer watch added for static route support #3715

Open
wants to merge 1 commit into
base: 2.x-master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions docs/config_examples/rbac/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ rules:
- apiGroups: ["config.openshift.io/v1"]
resources: ["network"]
verbs: ["list"]
- apiGroups: [ "crd.projectcalico.org" ]
resources: [ "blockaffinities" ]
verbs: [ "get", "watch", "list" ]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
3 changes: 3 additions & 0 deletions docs/config_examples/rbac/k8s_rbac.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ rules:
- apiGroups: ["config.openshift.io/v1"]
resources: ["network"]
verbs: ["list"]
- apiGroups: [ "crd.projectcalico.org" ]
resources: [ "blockaffinities" ]
verbs: [ "get", "watch", "list" ]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/clusterHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func (ch *ClusterHandler) addClusterConfig(clusterName string, config *ClusterCo
config.namespaceLabel = ch.namespaceLabel
config.nodeLabelSelector = ch.nodeLabelSelector
config.routeLabel = ch.routeLabel
config.orchestrationCNI = ch.orchestrationCNI
config.staticRoutingMode = ch.staticRoutingMode
config.nativeResourceSelector, _ = createLabelSelector(DefaultNativeResourceLabel)
config.customResourceSelector, _ = createLabelSelector(DefaultCustomResourceLabel)
ch.ClusterConfigs[clusterName] = config
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
CALICO_K8S = "calico-k8s"
CALICO_API_BLOCK_AFFINITIES = "/apis/crd.projectcalico.org/v1/blockaffinities"
CALICONodeIPAnnotation = "projectcalico.org/IPv4Address"
BLOCKAFFINITIES = "blockaffinities"

CommonPartition = "Common"
LocalCluster = ""
)
57 changes: 56 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package controller
import (
"context"
"fmt"
authv1 "k8s.io/api/authorization/v1"
"k8s.io/client-go/dynamic"
"net/http"
"os"
"strings"
Expand Down Expand Up @@ -173,6 +175,8 @@ func NewController(params Params, startController bool) *Controller {
clusterConfig.nodeLabelSelector = params.NodeLabelSelector
clusterConfig.nativeResourceSelector, _ = createLabelSelector(DefaultNativeResourceLabel)
clusterConfig.customResourceSelector, _ = createLabelSelector(DefaultCustomResourceLabel)
clusterConfig.orchestrationCNI = params.OrchestrationCNI
clusterConfig.staticRoutingMode = params.StaticRoutingMode
switch ctlr.mode {
case OpenShiftMode, KubernetesMode:
clusterConfig.routeLabel = params.RouteLabel
Expand All @@ -189,7 +193,6 @@ func NewController(params Params, startController bool) *Controller {
if ctlr.PoolMemberType == NodePort || ctlr.PoolMemberType == NodePortLocal {
ctlr.shareNodes = true
}

if err := ctlr.setupClientsforCluster(params.Config, params.IPAM, ctlr.mode == OpenShiftMode, ctlr.multiClusterHandler.LocalClusterName, clusterConfig); err != nil {
log.Errorf("Failed to Setup Clients: %v", err)
}
Expand All @@ -198,6 +201,8 @@ func NewController(params Params, startController bool) *Controller {
ctlr.multiClusterHandler.namespaces = params.Namespaces
ctlr.multiClusterHandler.nodeLabelSelector = params.NodeLabelSelector
ctlr.multiClusterHandler.routeLabel = params.RouteLabel
ctlr.multiClusterHandler.staticRoutingMode = params.StaticRoutingMode
ctlr.multiClusterHandler.orchestrationCNI = params.OrchestrationCNI
// add the cluster config for local cluster
ctlr.multiClusterHandler.addClusterConfig(ctlr.multiClusterHandler.LocalClusterName, clusterConfig)

Expand Down Expand Up @@ -420,12 +425,20 @@ func (ctlr *Controller) setupClientsforCluster(config *rest.Config, ipamClient,
}
}

var dynamicClient dynamic.Interface
if clusterConfig.orchestrationCNI == CALICO_K8S && clusterConfig.staticRoutingMode {
dynamicClient, err = dynamic.NewForConfig(config)
if nil != err {
return fmt.Errorf("Failed to create dynamic Client for cluster %s: %v", clusterName, err)
}
}
log.Debugf("Clients Created for cluster: %s", clusterName)
//Update the clusterConfig store
clusterConfig.kubeClient = kubeClient
clusterConfig.kubeCRClient = kubeCRClient
clusterConfig.kubeIPAMClient = kubeIPAMClient
clusterConfig.routeClientV1 = rclient
clusterConfig.dynamicClient = dynamicClient
return nil
}

Expand All @@ -438,6 +451,12 @@ func (ctlr *Controller) setupInformers(clusterName string) error {
}
}
_ = ctlr.setNodeInformer(clusterName)
// create block affinities informer for calico cni enabled clusters.
if clusterConfig.orchestrationCNI == CALICO_K8S && clusterConfig.staticRoutingMode {
if clusterConfig.dynamicClient != nil {
_ = ctlr.newDynamicInformersForCluster(clusterConfig.dynamicClient, clusterName)
}
}
return nil
}

Expand Down Expand Up @@ -492,6 +511,38 @@ func (ctlr *Controller) StartInformers(clusterName string) {
for _, inf := range informerStore.comInformers {
inf.start(ctlr.multiClusterHandler.LocalClusterName, false)
}

//start CNI informers if required
if ctlr.StaticRoutingMode && ctlr.OrchestrationCNI == CALICO_K8S {
// check if role permissions exists for calico crd
roleCheck := authv1.SelfSubjectAccessReview{
Spec: authv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authv1.ResourceAttributes{
Resource: BLOCKAFFINITIES,
Verb: "watch",
Group: "crd.projectcalico.org",
Namespace: "",
},
},
}
clusterConfig := ctlr.multiClusterHandler.getClusterConfig(ctlr.multiClusterHandler.LocalClusterName)
if clusterConfig != nil {
resp, err := clusterConfig.kubeClient.AuthorizationV1().
SelfSubjectAccessReviews().
Create(context.TODO(), &roleCheck, metaV1.CreateOptions{})
if err == nil {
if resp.Status.Allowed {
log.Debugf("RBAC present for blockaffinities watch: %v", resp)
informerStore.dynamicInformers.start()
} else {
log.Warning("Role Permissions to watch blockaffinities resource for calico CNI is not provided.Informers are not created for blockaffinities resource.Create proper RBAC for blockaffinities watch for static routing to work properly")
}
} else {
log.Errorf("Failed to create Self Subject Access Review for blockaffinities resource: %v.Skipping informer creation for blockaffinitoes resource", err)
}
}

}
switch ctlr.mode {
case OpenShiftMode, KubernetesMode:
// nrInformers only with openShiftMode
Expand Down Expand Up @@ -528,6 +579,10 @@ func (ctlr *Controller) StopInformers(clusterName string) {
for _, nsInf := range informerStore.nsInformers {
nsInf.stop()
}
// stop cni Informer
if ctlr.StaticRoutingMode && ctlr.OrchestrationCNI == CALICO_K8S {
informerStore.dynamicInformers.stop()
}
// stop node Informer
informerStore.nodeInformer.stop()
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/controller/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"k8s.io/client-go/dynamic/dynamicinformer"
"reflect"
"time"

Expand All @@ -37,6 +38,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)

Expand Down Expand Up @@ -1480,6 +1482,20 @@ func (nsInfr *NSInformer) stop() {
close(nsInfr.stopCh)
}

func (dynamicInf *DynamicInformers) start() {
if dynamicInf.CalicoBlockAffinityInformer != nil {
log.Infof("Starting calico block affinity Informer for cluster %v", dynamicInf.clusterName)
go dynamicInf.CalicoBlockAffinityInformer.Informer().Run(dynamicInf.stopCh)
if dynamicInf.CalicoBlockAffinityInformer.Informer().HasSynced() {
log.Debugf("Successfully synced block affinity informer caches for Cluster: %s", dynamicInf.clusterName)
}
}
}

func (dynamicInf *DynamicInformers) stop() {
close(dynamicInf.stopCh)
}

func (nodeInfr *NodeInformer) start(apiServerUnreachable bool) {
var cacheSyncs []cache.InformerSynced
if nodeInfr.nodeInformer != nil {
Expand Down Expand Up @@ -1628,3 +1644,33 @@ func (ctlr *Controller) getErrorHandlerFunc(rsType, clusterName string) func(r *
}
}
}

func (ctlr *Controller) newDynamicInformersForCluster(client dynamic.Interface, clusterName string) *DynamicInformers {
log.Debugf("Creating dynamic Informers for cluster: %s", clusterName)
clusterConfig := ctlr.multiClusterHandler.getClusterConfig(clusterName)
informers := &DynamicInformers{
clusterName: clusterName,
stopCh: make(chan struct{}),
}
dynamicInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, 0)
// if orchestration cni is calico and static routing mode is true,
// create informers for blockaffinities CR.
if ctlr.StaticRoutingMode && ctlr.OrchestrationCNI == CALICO_K8S {
informers.CalicoBlockAffinityInformer = dynamicInformerFactory.ForResource(CalicoBlockaffinity)
}
clusterConfig.InformerStore.dynamicInformers = informers
ctlr.addDynamicResourceEventHandlers(informers)
return informers
}

func (ctlr *Controller) addDynamicResourceEventHandlers(dynamicInf *DynamicInformers) {
if dynamicInf.CalicoBlockAffinityInformer != nil {
dynamicInf.CalicoBlockAffinityInformer.Informer().AddEventHandler(
&cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctlr.processBlockAffinities(dynamicInf.clusterName) },
DeleteFunc: func(obj interface{}) { ctlr.processBlockAffinities(dynamicInf.clusterName) },
},
)
dynamicInf.CalicoBlockAffinityInformer.Informer().SetWatchErrorHandler(ctlr.getErrorHandlerFunc(BLOCKAFFINITIES, dynamicInf.clusterName))
}
}
1 change: 1 addition & 0 deletions pkg/controller/multiClusterInformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func (ctlr *Controller) setupAndStartExternalClusterInformers(clusterName string
log.Errorf("[MultiCluster] unable to setup node informer for cluster: %v, Error: %v", clusterName, err)
return err
}

return nil
}

Expand Down
89 changes: 73 additions & 16 deletions pkg/controller/node_poll_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,17 @@ func (ctlr *Controller) processStaticRouteUpdate(
nodePodCIDRMap = ctlr.GetNodePodCIDRMap()
}
if nodeIPValue, ok := node.Annotations[CALICONodeIPAnnotation]; ok {
if cidr, ok := nodePodCIDRMap[node.Name]; ok {
route.Gateway = strings.Split(nodeIPValue, "/")[0]
route.Name = fmt.Sprintf("k8s-%v-%v", node.Name, route.Gateway)
route.Network = cidr
} else {
log.Warningf("Pod Network not found for node %v, static route not added", node.Name)
continue
for _, bacidr := range nodePodCIDRMap {
caRoute := routeConfig{}
if bacidr.nodeName == node.Name {
caRoute.Gateway = strings.Split(nodeIPValue, "/")[0]
caRoute.Name = fmt.Sprintf("k8s-%v", bacidr.baName)
caRoute.Network = bacidr.cidr
} else {
log.Warningf("Pod Network not found for node %v, static route not added", node.Name)
continue
}
routes.Entries = append(routes.Entries, caRoute)
}
} else {
log.Warningf("Host addresses annotation %v not found on node %v ,static route not added", CALICONodeIPAnnotation, node.Name)
Expand All @@ -325,7 +329,9 @@ func (ctlr *Controller) processStaticRouteUpdate(
continue
}
}
routes.Entries = append(routes.Entries, route)
if route != (routeConfig{}) {
routes.Entries = append(routes.Entries, route)
}
}
doneCh, errCh, err := ctlr.Agent.ConfigWriter.SendSection("static-routes", routes)

Expand Down Expand Up @@ -416,14 +422,14 @@ func parseHostCIDRS(ann string, nodenetwork *net.IPNet) (string, error) {
return "", err
}

func (ctlr *Controller) GetNodePodCIDRMap() map[string]string {
var nodePodCIDRMap map[string]string
func (ctlr *Controller) GetNodePodCIDRMap() []BlockAffinitycidr {
var bacidrs []BlockAffinitycidr
if ctlr.OrchestrationCNI == CALICO_K8S {
// Retrieve Calico Block Affinity
blockAffinitiesRaw, err := ctlr.multiClusterHandler.ClusterConfigs[""].kubeClient.Discovery().RESTClient().Get().AbsPath(CALICO_API_BLOCK_AFFINITIES).DoRaw(context.TODO())
blockAffinitiesRaw, err := ctlr.multiClusterHandler.ClusterConfigs[ctlr.multiClusterHandler.LocalClusterName].kubeClient.Discovery().RESTClient().Get().AbsPath(CALICO_API_BLOCK_AFFINITIES).DoRaw(context.TODO())
if err != nil {
log.Warningf("Calico blockaffinity resource not found on the cluster, getting error %v", err)
return nodePodCIDRMap
return bacidrs
}
// Define a map to store the unmarshalled data
var blockAffinities unstructured.UnstructuredList
Expand All @@ -432,14 +438,65 @@ func (ctlr *Controller) GetNodePodCIDRMap() map[string]string {
err = json.Unmarshal(blockAffinitiesRaw, &blockAffinities)
if err != nil {
log.Errorf("Unable to unmarshall block affinity resource %v, getting error %v", string(blockAffinitiesRaw), err)
return nodePodCIDRMap
return bacidrs
}
nodePodCIDRMap = make(map[string]string)
for _, blockAffinity := range blockAffinities.Items {
// Access the spec field from the unstructured object
specData := blockAffinity.Object["spec"].(map[string]interface{})
nodePodCIDRMap[specData["node"].(string)] = specData["cidr"].(string)
bacidr := BlockAffinitycidr{}
bacidr.baName = blockAffinity.Object["metadata"].(map[string]interface{})["name"].(string)
bacidr.nodeName = specData["node"].(string)
bacidr.cidr = specData["cidr"].(string)
bacidrs = append(bacidrs, bacidr)
}
}
return bacidrs
}

func (ctlr *Controller) processBlockAffinities(clusterName string) {
var baListInf []interface{}
if infStore, ok := ctlr.multiClusterHandler.ClusterConfigs[clusterName]; ok {
baListInf = infStore.dynamicInformers.CalicoBlockAffinityInformer.Informer().GetIndexer().List()
}
routes := routeSection{}
for _, obj := range baListInf {
blockAffinity := obj.(*unstructured.Unstructured)
baJSON, found, err := unstructured.NestedStringMap(blockAffinity.UnstructuredContent(), "spec")
if err != nil || !found {
log.Debugf("calico blockaffinity spec not found: %+v", err)
continue
}
baName := blockAffinity.Object["metadata"].(map[string]interface{})["name"]
clusterConfig := ctlr.multiClusterHandler.getClusterConfig(clusterName)
route := routeConfig{}
if clusterConfig != nil {
nodes := clusterConfig.oldNodes
cidr := baJSON["cidr"]
nodeName := baJSON["node"]
//check if node is in watched nodes
for _, node := range nodes {
if node.Name == nodeName {
route.Gateway = node.Addr
route.Name = fmt.Sprintf("k8s-%v", baName)
route.Network = cidr
routes.Entries = append(routes.Entries, route)
break
}
}
}
}
doneCh, errCh, err := ctlr.Agent.ConfigWriter.SendSection("static-routes", routes)

if nil != err {
log.Warningf("Failed to write static routes config section: %v", err)
} else {
select {
case <-doneCh:
log.Debugf("Wrote static route config section: %v", routes)
case e := <-errCh:
log.Warningf("Failed to write static route config section: %v", e)
case <-time.After(time.Second):
log.Warningf("Did not receive write response in 1s")
}
}
return nodePodCIDRMap
}
Loading
Loading