diff --git a/pkg/routeagent_driver/handlers/ovn/handler.go b/pkg/routeagent_driver/handlers/ovn/handler.go index 82bae7d78..9e9097dff 100644 --- a/pkg/routeagent_driver/handlers/ovn/handler.go +++ b/pkg/routeagent_driver/handlers/ovn/handler.go @@ -43,14 +43,15 @@ import ( type NewOVSDBClientFn func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error) type HandlerConfig struct { - Namespace string - ClusterCIDR []string - ServiceCIDR []string - SubmClient clientset.Interface - K8sClient kubernetes.Interface - DynClient dynamic.Interface - WatcherConfig *watcher.Config - NewOVSDBClient NewOVSDBClientFn + Namespace string + ClusterCIDR []string + ServiceCIDR []string + SubmClient clientset.Interface + K8sClient kubernetes.Interface + DynClient dynamic.Interface + WatcherConfig *watcher.Config + NewOVSDBClient NewOVSDBClientFn + TransitSwitchIP TransitSwitchIPGetter } type Handler struct { @@ -124,7 +125,7 @@ func (ovn *Handler) Init() error { return err } - nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.K8sClient, ovn.Namespace) + nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.Namespace, ovn.TransitSwitchIP) if err != nil { return err } diff --git a/pkg/routeagent_driver/handlers/ovn/handler_test.go b/pkg/routeagent_driver/handlers/ovn/handler_test.go index 119f8eac0..4565fb17e 100644 --- a/pkg/routeagent_driver/handlers/ovn/handler_test.go +++ b/pkg/routeagent_driver/handlers/ovn/handler_test.go @@ -51,7 +51,10 @@ const ( var _ = Describe("Handler", func() { t := newTestDriver() - var ovsdbClient *fakeovn.OVSDBClient + var ( + ovsdbClient *fakeovn.OVSDBClient + transitSwitchIP ovn.TransitSwitchIP + ) BeforeEach(func() { ovsdbClient = fakeovn.NewOVSDBClient() @@ -79,6 +82,8 @@ var _ = Describe("Handler", func() { restMapper := test.GetRESTMapperFor(&submarinerv1.GatewayRoute{}, &submarinerv1.NonGatewayRoute{}) + transitSwitchIP = ovn.NewTransitSwitchIP() + t.Start(ovn.NewHandler(&ovn.HandlerConfig{ Namespace: testing.Namespace, ClusterCIDR: []string{clusterCIDR}, @@ -93,9 +98,11 @@ var _ = Describe("Handler", func() { NewOVSDBClient: func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error) { return ovsdbClient, nil }, + TransitSwitchIP: transitSwitchIP, })) Expect(ovsdbClient.Connected()).To(BeTrue()) + Expect(transitSwitchIP.Init(t.k8sClient)).To(Succeed()) }) When("a remote Endpoint is created, updated, and deleted", func() { @@ -254,37 +261,89 @@ var _ = Describe("Handler", func() { }) }) - When("a NonGatewayRoute is created and deleted", func() { + When("NonGatewayRoutes are created, updated and deleted", func() { + verifyLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) { + for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs { + ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{ + Match: cidr, + Nexthop: ptr.To(nextHop), + }) + } + } + + verifyNoLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) { + for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs { + ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{ + Match: cidr, + Nexthop: ptr.To(nextHop), + }) + } + } + It("should correctly reconcile OVN router policies", func() { client := t.dynClient.Resource(submarinerv1.SchemeGroupVersion.WithResource("nongatewayroutes")).Namespace(testing.Namespace) - nonGWRoute := &submarinerv1.NonGatewayRoute{ + By("Creating first NonGatewayRoute") + + nextHop := "172.1.1.1" + + nonGWRoute1 := &submarinerv1.NonGatewayRoute{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-nongateway-route", + Name: "test-nongateway-route1", }, RoutePolicySpec: submarinerv1.RoutePolicySpec{ - NextHops: []string{"111.1.1.1"}, - RemoteCIDRs: []string{"192.0.1.0/24", "192.0.2.0/24"}, + NextHops: []string{nextHop}, + RemoteCIDRs: []string{"111.0.1.0/24", "111.0.2.0/24"}, }, } - test.CreateResource(client, nonGWRoute) + test.CreateResource(client, nonGWRoute1) - for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs { - ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{ - Match: cidr, - Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]), - }) - } + verifyLogicalRouterPolicies(nonGWRoute1, nextHop) - Expect(client.Delete(context.Background(), nonGWRoute.Name, metav1.DeleteOptions{})).To(Succeed()) + By("Creating second NonGatewayRoute") - for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs { - ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{ - Match: cidr, - Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]), - }) + nonGWRoute2 := &submarinerv1.NonGatewayRoute{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nongateway-route2", + }, + RoutePolicySpec: submarinerv1.RoutePolicySpec{ + NextHops: []string{nextHop}, + RemoteCIDRs: []string{"222.0.1.0/24", "222.0.2.0/24"}, + }, } + + test.CreateResource(client, nonGWRoute2) + + verifyLogicalRouterPolicies(nonGWRoute1, nextHop) + verifyLogicalRouterPolicies(nonGWRoute2, nextHop) + + By("Updating NextHop for first NonGatewayRoute") + + prevNextHop := nextHop + nextHop = "172.1.1.2" + nonGWRoute1.RoutePolicySpec.NextHops[0] = nextHop + + test.UpdateResource(client, nonGWRoute1) + + verifyLogicalRouterPolicies(nonGWRoute1, nextHop) + verifyNoLogicalRouterPolicies(nonGWRoute1, prevNextHop) + verifyNoLogicalRouterPolicies(nonGWRoute2, prevNextHop) + + By("Updating NextHop for second NonGatewayRoute") + + nonGWRoute2.RoutePolicySpec.NextHops[0] = nextHop + + test.UpdateResource(client, nonGWRoute2) + + verifyLogicalRouterPolicies(nonGWRoute1, nextHop) + verifyLogicalRouterPolicies(nonGWRoute2, nextHop) + + By("Deleting first NonGatewayRoute") + + Expect(client.Delete(context.Background(), nonGWRoute1.Name, metav1.DeleteOptions{})).To(Succeed()) + + verifyNoLogicalRouterPolicies(nonGWRoute1, nextHop) }) }) diff --git a/pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go index 5179d4996..94dcf1794 100644 --- a/pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go +++ b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go @@ -22,10 +22,8 @@ import ( "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/watcher" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" - nodeutil "github.com/submariner-io/submariner/pkg/node" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" - clientset "k8s.io/client-go/kubernetes" ) type NonGatewayRouteController struct { @@ -33,12 +31,12 @@ type NonGatewayRouteController struct { connectionHandler *ConnectionHandler remoteSubnets sets.Set[string] stopCh chan struct{} - transitSwitchIP string + transitSwitchIP TransitSwitchIPGetter } //nolint:gocritic // Ignore hugeParam func NewNonGatewayRouteController(config watcher.Config, connectionHandler *ConnectionHandler, - k8sClientSet clientset.Interface, namespace string, + namespace string, transitSwitchIP TransitSwitchIPGetter, ) (*NonGatewayRouteController, error) { // We'll panic if config is nil, this is intentional var err error @@ -47,6 +45,7 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn connectionHandler: connectionHandler, remoteSubnets: sets.New[string](), stopCh: make(chan struct{}), + transitSwitchIP: transitSwitchIP, } config.ResourceConfigs = []watcher.ResourceConfig{ @@ -62,25 +61,6 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn }, } - node, err := nodeutil.GetLocalNode(k8sClientSet) - if err != nil { - return nil, errors.Wrap(err, "error getting the local node info") - } - - annotations := node.GetAnnotations() - - transitSwitchIP := annotations["k8s.ovn.org/node-transit-switch-port-ifaddr"] - if transitSwitchIP == "" { - // This is a non-IC setup , so this controller will not be started. - logger.Infof("No transit switch IP configured on node %q", node.Name) - return controller, nil - } - - controller.transitSwitchIP, err = jsonToIP(transitSwitchIP) - if err != nil { - return nil, errors.Wrapf(err, "error parsing transit switch IP") - } - controller.nonGatewayRouteWatcher, err = watcher.New(&config) if err != nil { return nil, errors.Wrap(err, "error creating resource watcher") @@ -129,7 +109,7 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma } // If this node belongs to same zone as gateway node, ignore the event. - if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP { + if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP.Get() { for _, subnet := range submNonGWRoute.RoutePolicySpec.RemoteCIDRs { if addSubnet { g.remoteSubnets.Insert(subnet) @@ -145,7 +125,5 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma } func (g *NonGatewayRouteController) stop() { - if g.transitSwitchIP != "" { - close(g.stopCh) - } + close(g.stopCh) } diff --git a/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go index 50056909e..24992cee9 100644 --- a/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go +++ b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go @@ -28,47 +28,32 @@ import ( submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" "github.com/submariner-io/submariner/pkg/cni" "github.com/submariner-io/submariner/pkg/event" - nodeutil "github.com/submariner-io/submariner/pkg/node" - "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes" ) type NonGatewayRouteHandler struct { event.HandlerBase + event.NodeHandlerBase smClient submarinerClientset.Interface - k8sClient clientset.Interface - transitSwitchIP string + k8sClient kubernetes.Interface + transitSwitchIP TransitSwitchIP } -func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient clientset.Interface) *NonGatewayRouteHandler { +func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient kubernetes.Interface, transitSwitchIP TransitSwitchIP, +) *NonGatewayRouteHandler { return &NonGatewayRouteHandler{ - smClient: smClient, - k8sClient: k8sClient, + smClient: smClient, + k8sClient: k8sClient, + transitSwitchIP: transitSwitchIP, } } func (h *NonGatewayRouteHandler) Init() error { logger.Info("Starting NonGatewayRouteHandler") - - node, err := nodeutil.GetLocalNode(h.k8sClient) - if err != nil { - return errors.Wrap(err, "error getting the g/w node") - } - - annotations := node.GetAnnotations() - - // TODO transitSwitchIP changes support needs to be added. - transitSwitchIP, ok := annotations[constants.OvnTransitSwitchIPAnnotation] - if !ok { - logger.Infof("No transit switch IP configured") - return nil - } - - h.transitSwitchIP, err = jsonToIP(transitSwitchIP) - - return errors.Wrapf(err, "error parsing the transit switch IP") + return errors.Wrap(h.transitSwitchIP.Init(h.k8sClient), "error initializing TransitSwitchIP") } func (h *NonGatewayRouteHandler) GetName() string { @@ -80,7 +65,7 @@ func (h *NonGatewayRouteHandler) GetNetworkPlugins() []string { } func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error { - if !h.State().IsOnGateway() || h.transitSwitchIP == "" { + if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" { return nil } @@ -98,7 +83,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.En } func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error { - if !h.State().IsOnGateway() || h.transitSwitchIP == "" { + if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" { return nil } @@ -113,7 +98,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.En } func (h *NonGatewayRouteHandler) TransitionToGateway() error { - if h.transitSwitchIP == "" { + if h.transitSwitchIP.Get() == "" { return nil } @@ -141,6 +126,38 @@ func (h *NonGatewayRouteHandler) TransitionToGateway() error { return nil } +func (h *NonGatewayRouteHandler) NodeUpdated(node *corev1.Node) error { + updated, err := h.transitSwitchIP.UpdateFrom(node) + if err != nil { + logger.Errorf(err, "Error updating transit switch IP from node: %s", resource.ToJSON(node)) + return nil + } + + if !updated { + return nil + } + + logger.Infof("Transit switch IP updated to %s", h.transitSwitchIP.Get()) + + if !h.State().IsOnGateway() { + return nil + } + + endpoints := h.State().GetRemoteEndpoints() + for i := range endpoints { + err = util.Update(context.TODO(), NonGatewayResourceInterface(h.smClient, endpoints[i].Namespace), + h.newNonGatewayRoute(&endpoints[i]), func(existing *submarinerv1.NonGatewayRoute) (*submarinerv1.NonGatewayRoute, error) { + existing.RoutePolicySpec.NextHops = []string{h.transitSwitchIP.Get()} + return existing, nil + }) + if err != nil { + return errors.Wrapf(err, "error updating NonGatewayRoute") + } + } + + return nil +} + func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute { return &submarinerv1.NonGatewayRoute{ ObjectMeta: metav1.ObjectMeta{ @@ -149,7 +166,7 @@ func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpo }, RoutePolicySpec: submarinerv1.RoutePolicySpec{ RemoteCIDRs: endpoint.Spec.Subnets, - NextHops: []string{h.transitSwitchIP}, + NextHops: []string{h.transitSwitchIP.Get()}, }, } } diff --git a/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler_test.go b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler_test.go index 3d106fa41..d88ab5d60 100644 --- a/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler_test.go +++ b/pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler_test.go @@ -20,6 +20,7 @@ package ovn_test import ( "errors" + "os" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -27,14 +28,17 @@ import ( "github.com/submariner-io/admiral/pkg/test" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "github.com/submariner-io/submariner/pkg/event/testing" + "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/ovn" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var _ = Describe("NonGatewayRouteHandler", func() { t := newTestDriver() JustBeforeEach(func() { - t.Start(ovn.NewNonGatewayRouteHandler(t.submClient, t.k8sClient)) + t.Start(ovn.NewNonGatewayRouteHandler(t.submClient, t.k8sClient, ovn.NewTransitSwitchIP())) }) awaitNonGatewayRoute := func(ep *submarinerv1.Endpoint) { @@ -116,4 +120,29 @@ var _ = Describe("NonGatewayRouteHandler", func() { }) }) }) + + When("the local node's transit switch IP is updated", func() { + JustBeforeEach(func() { + t.CreateLocalHostEndpoint() + }) + + It("should update existing NonGatewayRoutes", func() { + endpoint := t.CreateEndpoint(testing.NewEndpoint("remote-cluster", "host", "193.0.4.0/24")) + awaitNonGatewayRoute(endpoint) + + t.transitSwitchIP = "10.34.87.2" + + t.UpdateNode(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: os.Getenv("NODE_NAME"), + Annotations: map[string]string{constants.OvnTransitSwitchIPAnnotation: toTransitSwitchIPAnnotation(t.transitSwitchIP)}, + }, + }) + + Eventually(func() string { + return test.AwaitResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), + endpoint.Spec.ClusterID).RoutePolicySpec.NextHops[0] + }).Should(Equal(t.transitSwitchIP)) + }) + }) }) diff --git a/pkg/routeagent_driver/handlers/ovn/ovn_logical_routes.go b/pkg/routeagent_driver/handlers/ovn/ovn_logical_routes.go index 03e6b1e60..34201870a 100644 --- a/pkg/routeagent_driver/handlers/ovn/ovn_logical_routes.go +++ b/pkg/routeagent_driver/handlers/ovn/ovn_logical_routes.go @@ -19,6 +19,7 @@ limitations under the License. package ovn import ( + "reflect" "strings" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops" @@ -84,7 +85,7 @@ func (c *ConnectionHandler) reconcileSubOvnLogicalRouterPolicies(remoteSubnets s lrpStalePredicate := func(item *nbdb.LogicalRouterPolicy) bool { subnet := strings.Split(item.Match, " ")[2] - return item.Priority == ovnRoutePoliciesPrio && !remoteSubnets.Has(subnet) + return item.Priority == ovnRoutePoliciesPrio && (!remoteSubnets.Has(subnet) || !reflect.DeepEqual(item.Nexthop, &nextHop)) } // Cleanup any existing lrps not representing the correct set of remote subnets diff --git a/pkg/routeagent_driver/handlers/ovn/ovn_suite_test.go b/pkg/routeagent_driver/handlers/ovn/ovn_suite_test.go index a11119b5c..497dabafc 100644 --- a/pkg/routeagent_driver/handlers/ovn/ovn_suite_test.go +++ b/pkg/routeagent_driver/handlers/ovn/ovn_suite_test.go @@ -42,6 +42,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" fakedynamic "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes" fakek8s "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" ) @@ -117,30 +118,41 @@ func newTestDriver() *testDriver { } func (t *testDriver) Start(handler event.Handler) { - t.createNode() + node := t.createNode() t.handler = handler t.ControllerSupport.Start(handler) + t.CreateNode(node) } -func (t *testDriver) createNode() { +func (t *testDriver) createNode() *corev1.Node { + return createNode(t.k8sClient, t.transitSwitchIP) +} + +func createNode(k8sClient kubernetes.Interface, transitSwitchIP string) *corev1.Node { node := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "test-node", }, } - if t.transitSwitchIP != "" { - data := map[string]string{"ipv4": t.transitSwitchIP + "/24"} - bytes, err := json.Marshal(data) - Expect(err).To(Succeed()) - - node.Annotations = map[string]string{constants.OvnTransitSwitchIPAnnotation: string(bytes)} + if transitSwitchIP != "" { + node.Annotations = map[string]string{constants.OvnTransitSwitchIPAnnotation: toTransitSwitchIPAnnotation(transitSwitchIP)} } - _, err := t.k8sClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}) + _, err := k8sClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}) Expect(err).To(Succeed()) os.Setenv("NODE_NAME", node.Name) + + return node +} + +func toTransitSwitchIPAnnotation(ip string) string { + data := map[string]string{"ipv4": ip + "/24"} + bytes, err := json.Marshal(data) + Expect(err).To(Succeed()) + + return string(bytes) } func toIPNet(s string) *net.IPNet { diff --git a/pkg/routeagent_driver/handlers/ovn/transit_switch_ip.go b/pkg/routeagent_driver/handlers/ovn/transit_switch_ip.go new file mode 100644 index 000000000..d171902a9 --- /dev/null +++ b/pkg/routeagent_driver/handlers/ovn/transit_switch_ip.go @@ -0,0 +1,85 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ovn + +import ( + "os" + "sync/atomic" + + "github.com/pkg/errors" + nodeutil "github.com/submariner-io/submariner/pkg/node" + "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" +) + +type TransitSwitchIPGetter interface { + Get() string +} + +type TransitSwitchIP interface { + TransitSwitchIPGetter + Init(k8sClient kubernetes.Interface) error + UpdateFrom(node *corev1.Node) (bool, error) +} + +type transitSwitchIPImpl struct { + value atomic.Value +} + +func NewTransitSwitchIP() TransitSwitchIP { + t := &transitSwitchIPImpl{} + t.value.Store("") + + return t +} + +func (t *transitSwitchIPImpl) Get() string { + return t.value.Load().(string) +} + +func (t *transitSwitchIPImpl) Init(k8sClient kubernetes.Interface) error { + node, err := nodeutil.GetLocalNode(k8sClient) + if err != nil { + return errors.Wrap(err, "error getting the local node") + } + + _, err = t.UpdateFrom(node) + + return err +} + +func (t *transitSwitchIPImpl) UpdateFrom(node *corev1.Node) (bool, error) { + if node.Name != os.Getenv("NODE_NAME") { + return false, nil + } + + value, ok := node.Annotations[constants.OvnTransitSwitchIPAnnotation] + if !ok { + logger.Infof("No transit switch IP configured on node %q", node.Name) + return false, nil + } + + transitSwitchIP, err := jsonToIP(value) + if err != nil { + return false, errors.Wrapf(err, "error parsing the transit switch IP") + } + + return transitSwitchIP != t.value.Swap(transitSwitchIP), nil +} diff --git a/pkg/routeagent_driver/handlers/ovn/transit_switch_ip_test.go b/pkg/routeagent_driver/handlers/ovn/transit_switch_ip_test.go new file mode 100644 index 000000000..a94d4fdf7 --- /dev/null +++ b/pkg/routeagent_driver/handlers/ovn/transit_switch_ip_test.go @@ -0,0 +1,136 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ovn_test + +import ( + "context" + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" + "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/ovn" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fakek8s "k8s.io/client-go/kubernetes/fake" +) + +var _ = Describe("TransitSwitchIP", func() { + var ( + transitSwitchIP ovn.TransitSwitchIP + nodeIP string + node *corev1.Node + ) + + BeforeEach(func() { + transitSwitchIP = ovn.NewTransitSwitchIP() + nodeIP = "" + }) + + Context("Init", func() { + var k8sClient *fakek8s.Clientset + + JustBeforeEach(func() { + k8sClient = fakek8s.NewClientset() + node = createNode(k8sClient, nodeIP) + }) + + When("the node annotation exists", func() { + BeforeEach(func() { + nodeIP = "172.1.2.3" + }) + + It("should set the TransitSwitchIP value", func() { + Expect(transitSwitchIP.Init(k8sClient)).To(Succeed()) + Expect(transitSwitchIP.Get()).To(Equal(nodeIP)) + }) + }) + + When("the node annotation does not exist", func() { + It("should succeed and set an empty TransitSwitchIP value", func() { + Expect(transitSwitchIP.Init(k8sClient)).To(Succeed()) + Expect(transitSwitchIP.Get()).To(Equal("")) + }) + }) + + When("the local node isn't found due to missing NODE_NAME env var", func() { + JustBeforeEach(func() { + os.Unsetenv("NODE_NAME") + }) + + It("should fail", func() { + Expect(transitSwitchIP.Init(k8sClient)).ToNot(Succeed()) + }) + }) + + When("the node annotation contains an invalid value", func() { + JustBeforeEach(func() { + node.Annotations = map[string]string{constants.OvnTransitSwitchIPAnnotation: "invalid"} + _, err := k8sClient.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{}) + Expect(err).To(Succeed()) + }) + + It("should fail", func() { + Expect(transitSwitchIP.Init(k8sClient)).ToNot(Succeed()) + }) + }) + }) + + Context("UpdateFrom", func() { + localNodeName := "local-node" + + BeforeEach(func() { + os.Setenv("NODE_NAME", localNodeName) + + nodeIP = "172.1.2.3" + node = &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: localNodeName, + Annotations: map[string]string{constants.OvnTransitSwitchIPAnnotation: toTransitSwitchIPAnnotation(nodeIP)}, + }, + } + }) + + When("the local node", func() { + It("should set the TransitSwitchIP value", func() { + updated, err := transitSwitchIP.UpdateFrom(node) + Expect(err).To(Succeed()) + Expect(updated).To(BeTrue()) + Expect(transitSwitchIP.Get()).To(Equal(nodeIP)) + + updated, err = transitSwitchIP.UpdateFrom(node) + Expect(err).To(Succeed()) + Expect(updated).To(BeFalse()) + }) + }) + + When("not the local node", func() { + BeforeEach(func() { + node.Name = "not-local" + }) + + It("should not set the TransitSwitchIP value", func() { + updated, err := transitSwitchIP.UpdateFrom(node) + Expect(err).To(Succeed()) + Expect(updated).To(BeFalse()) + Expect(transitSwitchIP.Get()).To(Equal("")) + }) + }) + }) +}) diff --git a/pkg/routeagent_driver/main.go b/pkg/routeagent_driver/main.go index 28f3110c7..58a937034 100644 --- a/pkg/routeagent_driver/main.go +++ b/pkg/routeagent_driver/main.go @@ -127,6 +127,7 @@ func main() { np = cni.Generic } + transitSwitchIP := ovn.NewTransitSwitchIP() submSpec := types.SubmarinerSpecification{} logger.FatalOnError(envconfig.Process("submariner", &submSpec), "Error processing env vars") @@ -145,16 +146,17 @@ func main() { registry, err := event.NewRegistry("routeagent_driver", np, kubeproxy.NewSyncHandler(env.ClusterCidr, env.ServiceCidr), ovn.NewHandler(&ovn.HandlerConfig{ - Namespace: env.Namespace, - ClusterCIDR: env.ClusterCidr, - ServiceCIDR: env.ServiceCidr, - SubmClient: smClientset, - K8sClient: k8sClientSet, - DynClient: dynamicClientSet, - WatcherConfig: config, + Namespace: env.Namespace, + ClusterCIDR: env.ClusterCidr, + ServiceCIDR: env.ServiceCidr, + SubmClient: smClientset, + K8sClient: k8sClientSet, + DynClient: dynamicClientSet, + WatcherConfig: config, + TransitSwitchIP: transitSwitchIP, }), ovn.NewGatewayRouteHandler(smClientset), - ovn.NewNonGatewayRouteHandler(smClientset, k8sClientSet), + ovn.NewNonGatewayRouteHandler(smClientset, k8sClientSet, transitSwitchIP), cabledriver.NewXRFMCleanupHandler(), cabledriver.NewVXLANCleanup(), mtu.NewMTUHandler(env.ClusterCidr, len(env.GlobalCidr) != 0, getTCPMssValue(localNode)),