Skip to content

Commit

Permalink
Revert "Revert "Handle updates to the node OVN transit switch IP""
Browse files Browse the repository at this point in the history
This reverts commit 6ac9266.
  • Loading branch information
tpantelis committed Oct 17, 2024
1 parent ae7612b commit 7582781
Show file tree
Hide file tree
Showing 10 changed files with 424 additions and 104 deletions.
19 changes: 10 additions & 9 deletions pkg/routeagent_driver/handlers/ovn/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ import (
type NewOVSDBClientFn func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error)

type HandlerConfig struct {
Namespace string
ClusterCIDR []string
ServiceCIDR []string
SubmClient clientset.Interface
K8sClient kubernetes.Interface
DynClient dynamic.Interface
WatcherConfig *watcher.Config
NewOVSDBClient NewOVSDBClientFn
Namespace string
ClusterCIDR []string
ServiceCIDR []string
SubmClient clientset.Interface
K8sClient kubernetes.Interface
DynClient dynamic.Interface
WatcherConfig *watcher.Config
NewOVSDBClient NewOVSDBClientFn
TransitSwitchIP TransitSwitchIPGetter
}

type Handler struct {
Expand Down Expand Up @@ -124,7 +125,7 @@ func (ovn *Handler) Init() error {
return err
}

nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.K8sClient, ovn.Namespace)
nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.Namespace, ovn.TransitSwitchIP)
if err != nil {
return err
}
Expand Down
97 changes: 78 additions & 19 deletions pkg/routeagent_driver/handlers/ovn/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ const (
var _ = Describe("Handler", func() {
t := newTestDriver()

var ovsdbClient *fakeovn.OVSDBClient
var (
ovsdbClient *fakeovn.OVSDBClient
transitSwitchIP ovn.TransitSwitchIP
)

BeforeEach(func() {
ovsdbClient = fakeovn.NewOVSDBClient()
Expand Down Expand Up @@ -79,6 +82,8 @@ var _ = Describe("Handler", func() {

restMapper := test.GetRESTMapperFor(&submarinerv1.GatewayRoute{}, &submarinerv1.NonGatewayRoute{})

transitSwitchIP = ovn.NewTransitSwitchIP()

t.Start(ovn.NewHandler(&ovn.HandlerConfig{
Namespace: testing.Namespace,
ClusterCIDR: []string{clusterCIDR},
Expand All @@ -93,9 +98,11 @@ var _ = Describe("Handler", func() {
NewOVSDBClient: func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error) {
return ovsdbClient, nil
},
TransitSwitchIP: transitSwitchIP,
}))

Expect(ovsdbClient.Connected()).To(BeTrue())
Expect(transitSwitchIP.Init(t.k8sClient)).To(Succeed())
})

When("a remote Endpoint is created, updated, and deleted", func() {
Expand Down Expand Up @@ -254,37 +261,89 @@ var _ = Describe("Handler", func() {
})
})

When("a NonGatewayRoute is created and deleted", func() {
When("NonGatewayRoutes are created, updated and deleted", func() {
verifyLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) {
for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nextHop),
})
}
}

verifyNoLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) {
for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nextHop),
})
}
}

It("should correctly reconcile OVN router policies", func() {
client := t.dynClient.Resource(submarinerv1.SchemeGroupVersion.WithResource("nongatewayroutes")).Namespace(testing.Namespace)

nonGWRoute := &submarinerv1.NonGatewayRoute{
By("Creating first NonGatewayRoute")

nextHop := "172.1.1.1"

nonGWRoute1 := &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: "test-nongateway-route",
Name: "test-nongateway-route1",
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
NextHops: []string{"111.1.1.1"},
RemoteCIDRs: []string{"192.0.1.0/24", "192.0.2.0/24"},
NextHops: []string{nextHop},
RemoteCIDRs: []string{"111.0.1.0/24", "111.0.2.0/24"},
},
}

test.CreateResource(client, nonGWRoute)
test.CreateResource(client, nonGWRoute1)

for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]),
})
}
verifyLogicalRouterPolicies(nonGWRoute1, nextHop)

Expect(client.Delete(context.Background(), nonGWRoute.Name, metav1.DeleteOptions{})).To(Succeed())
By("Creating second NonGatewayRoute")

for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]),
})
nonGWRoute2 := &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: "test-nongateway-route2",
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
NextHops: []string{nextHop},
RemoteCIDRs: []string{"222.0.1.0/24", "222.0.2.0/24"},
},
}

test.CreateResource(client, nonGWRoute2)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
verifyLogicalRouterPolicies(nonGWRoute2, nextHop)

By("Updating NextHop for first NonGatewayRoute")

prevNextHop := nextHop
nextHop = "172.1.1.2"
nonGWRoute1.RoutePolicySpec.NextHops[0] = nextHop

test.UpdateResource(client, nonGWRoute1)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
verifyNoLogicalRouterPolicies(nonGWRoute1, prevNextHop)
verifyNoLogicalRouterPolicies(nonGWRoute2, prevNextHop)

By("Updating NextHop for second NonGatewayRoute")

nonGWRoute2.RoutePolicySpec.NextHops[0] = nextHop

test.UpdateResource(client, nonGWRoute2)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
verifyLogicalRouterPolicies(nonGWRoute2, nextHop)

By("Deleting first NonGatewayRoute")

Expect(client.Delete(context.Background(), nonGWRoute1.Name, metav1.DeleteOptions{})).To(Succeed())

verifyNoLogicalRouterPolicies(nonGWRoute1, nextHop)
})
})

Expand Down
32 changes: 5 additions & 27 deletions pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@ import (
"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/watcher"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
nodeutil "github.com/submariner-io/submariner/pkg/node"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
)

type NonGatewayRouteController struct {
nonGatewayRouteWatcher watcher.Interface
connectionHandler *ConnectionHandler
remoteSubnets sets.Set[string]
stopCh chan struct{}
transitSwitchIP string
transitSwitchIP TransitSwitchIPGetter
}

//nolint:gocritic // Ignore hugeParam
func NewNonGatewayRouteController(config watcher.Config, connectionHandler *ConnectionHandler,
k8sClientSet clientset.Interface, namespace string,
namespace string, transitSwitchIP TransitSwitchIPGetter,
) (*NonGatewayRouteController, error) {
// We'll panic if config is nil, this is intentional
var err error
Expand All @@ -47,6 +45,7 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn
connectionHandler: connectionHandler,
remoteSubnets: sets.New[string](),
stopCh: make(chan struct{}),
transitSwitchIP: transitSwitchIP,
}

config.ResourceConfigs = []watcher.ResourceConfig{
Expand All @@ -62,25 +61,6 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn
},
}

node, err := nodeutil.GetLocalNode(k8sClientSet)
if err != nil {
return nil, errors.Wrap(err, "error getting the local node info")
}

annotations := node.GetAnnotations()

transitSwitchIP := annotations["k8s.ovn.org/node-transit-switch-port-ifaddr"]
if transitSwitchIP == "" {
// This is a non-IC setup , so this controller will not be started.
logger.Infof("No transit switch IP configured on node %q", node.Name)
return controller, nil
}

controller.transitSwitchIP, err = jsonToIP(transitSwitchIP)
if err != nil {
return nil, errors.Wrapf(err, "error parsing transit switch IP")
}

controller.nonGatewayRouteWatcher, err = watcher.New(&config)
if err != nil {
return nil, errors.Wrap(err, "error creating resource watcher")
Expand Down Expand Up @@ -129,7 +109,7 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma
}

// If this node belongs to same zone as gateway node, ignore the event.
if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP {
if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP.Get() {
for _, subnet := range submNonGWRoute.RoutePolicySpec.RemoteCIDRs {
if addSubnet {
g.remoteSubnets.Insert(subnet)
Expand All @@ -145,7 +125,5 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma
}

func (g *NonGatewayRouteController) stop() {
if g.transitSwitchIP != "" {
close(g.stopCh)
}
close(g.stopCh)
}
77 changes: 47 additions & 30 deletions pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,47 +28,32 @@ import (
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/cni"
"github.com/submariner-io/submariner/pkg/event"
nodeutil "github.com/submariner-io/submariner/pkg/node"
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes"
)

type NonGatewayRouteHandler struct {
event.HandlerBase
event.NodeHandlerBase
smClient submarinerClientset.Interface
k8sClient clientset.Interface
transitSwitchIP string
k8sClient kubernetes.Interface
transitSwitchIP TransitSwitchIP
}

func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient clientset.Interface) *NonGatewayRouteHandler {
func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient kubernetes.Interface, transitSwitchIP TransitSwitchIP,
) *NonGatewayRouteHandler {
return &NonGatewayRouteHandler{
smClient: smClient,
k8sClient: k8sClient,
smClient: smClient,
k8sClient: k8sClient,
transitSwitchIP: transitSwitchIP,
}
}

func (h *NonGatewayRouteHandler) Init() error {
logger.Info("Starting NonGatewayRouteHandler")

node, err := nodeutil.GetLocalNode(h.k8sClient)
if err != nil {
return errors.Wrap(err, "error getting the g/w node")
}

annotations := node.GetAnnotations()

// TODO transitSwitchIP changes support needs to be added.
transitSwitchIP, ok := annotations[constants.OvnTransitSwitchIPAnnotation]
if !ok {
logger.Infof("No transit switch IP configured")
return nil
}

h.transitSwitchIP, err = jsonToIP(transitSwitchIP)

return errors.Wrapf(err, "error parsing the transit switch IP")
return errors.Wrap(h.transitSwitchIP.Init(h.k8sClient), "error initializing TransitSwitchIP")
}

func (h *NonGatewayRouteHandler) GetName() string {
Expand All @@ -80,7 +65,7 @@ func (h *NonGatewayRouteHandler) GetNetworkPlugins() []string {
}

func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
if !h.State().IsOnGateway() || h.transitSwitchIP == "" {
if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" {
return nil
}

Expand All @@ -98,7 +83,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.En
}

func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
if !h.State().IsOnGateway() || h.transitSwitchIP == "" {
if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" {
return nil
}

Expand All @@ -113,7 +98,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.En
}

func (h *NonGatewayRouteHandler) TransitionToGateway() error {
if h.transitSwitchIP == "" {
if h.transitSwitchIP.Get() == "" {
return nil
}

Expand Down Expand Up @@ -141,6 +126,38 @@ func (h *NonGatewayRouteHandler) TransitionToGateway() error {
return nil
}

func (h *NonGatewayRouteHandler) NodeUpdated(node *corev1.Node) error {
updated, err := h.transitSwitchIP.UpdateFrom(node)
if err != nil {
logger.Errorf(err, "Error updating transit switch IP from node: %s", resource.ToJSON(node))
return nil
}

if !updated {
return nil
}

logger.Infof("Transit switch IP updated to %s", h.transitSwitchIP.Get())

if !h.State().IsOnGateway() {
return nil
}

endpoints := h.State().GetRemoteEndpoints()
for i := range endpoints {
err = util.Update(context.TODO(), NonGatewayResourceInterface(h.smClient, endpoints[i].Namespace),
h.newNonGatewayRoute(&endpoints[i]), func(existing *submarinerv1.NonGatewayRoute) (*submarinerv1.NonGatewayRoute, error) {
existing.RoutePolicySpec.NextHops = []string{h.transitSwitchIP.Get()}
return existing, nil
})
if err != nil {
return errors.Wrapf(err, "error updating NonGatewayRoute")
}
}

return nil
}

func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute {
return &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -149,7 +166,7 @@ func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpo
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
NextHops: []string{h.transitSwitchIP},
NextHops: []string{h.transitSwitchIP.Get()},
},
}
}
Expand Down
Loading

0 comments on commit 7582781

Please sign in to comment.