From b1326efcd8b09ef03c76f0410dbd72b332ca0846 Mon Sep 17 00:00:00 2001 From: Yossi Boaron Date: Sun, 19 Jan 2025 10:35:12 +0200 Subject: [PATCH] Gateway - use endpoint set private, public and healthcheck IPs This PR : - updates gateway code to use endpoint SetHealthCheckIP ,SetPrivateIP and SetPublicIP. - add placeholders for IPV6 public,private and healthcheck IPs. Signed-off-by: Yossi Boaron --- pkg/endpoint/local_endpoint.go | 54 +++++++++++++++++++++++-------- pkg/endpoint/local_ip.go | 14 ++++++++ pkg/endpoint/public_ip.go | 22 +++++++++++++ pkg/endpoint/public_ip_watcher.go | 37 +++++++++++++++------ pkg/gateway/gateway.go | 5 +-- pkg/types/types.go | 9 ++++++ 6 files changed, 116 insertions(+), 25 deletions(-) diff --git a/pkg/endpoint/local_endpoint.go b/pkg/endpoint/local_endpoint.go index 32a1cb8ae..d3363462f 100644 --- a/pkg/endpoint/local_endpoint.go +++ b/pkg/endpoint/local_endpoint.go @@ -39,9 +39,11 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + k8serrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + k8snet "k8s.io/utils/net" "k8s.io/utils/set" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -137,7 +139,7 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification, airGappedDeployment bool, ) (*submv1.EndpointSpec, error) { // We'll panic if submSpec is nil, this is intentional - privateIP := GetLocalIP() + privateIPs := GetLocalIPs(submSpec.GetIPFamilies()) gwNode, err := node.GetLocalNode(ctx, k8sClient) if err != nil { @@ -157,6 +159,7 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification, localSubnets = submSpec.GlobalCidr globalnetEnabled = true } else { + // TODO_IPV6: set localSubnets to submSpec.ServiceCidr + submSpec.ClusterCidr localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ServiceCidr)...) localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ClusterCidr)...) } @@ -171,42 +174,67 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification, } endpointSpec := &submv1.EndpointSpec{ - CableName: fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID, strings.ReplaceAll(privateIP, ".", "-")), ClusterID: submSpec.ClusterID, Hostname: hostname, - PrivateIP: privateIP, NATEnabled: submSpec.NATEnabled, Subnets: localSubnets, Backend: submSpec.CableDriver, BackendConfig: backendConfig, } - publicIP, resolver, err := getPublicIP(submSpec, k8sClient, backendConfig, airGappedDeployment) - if err != nil { - return nil, errors.Wrap(err, "could not determine public IP") + for _, privateIP := range privateIPs { + endpointSpec.SetPrivateIP(privateIP) } - logger.Infof("Obtained local endpoint public IP %q using resolver %q", publicIP, resolver) + endpointSpec.CableName = fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID, + strings.ReplaceAll(endpointSpec.GetPrivateIP(k8snet.IPv4), ".", "-")) + publicIPs, err := getPublicIPs(submSpec, k8sClient, backendConfig, airGappedDeployment) + + if len(publicIPs) == 0 { + return nil, errors.Wrap(err, "could not determine public IP") + } - endpointSpec.PublicIP = publicIP + for _, publicIP := range publicIPs { + endpointSpec.SetPublicIP(publicIP) + } if submSpec.HealthCheckEnabled && !globalnetEnabled { // When globalnet is enabled, HealthCheckIP will be the globalIP assigned to the Active GatewayNode. // In a fresh deployment, globalIP annotation for the node might take few seconds. So we listen on NodeEvents // and update the endpoint HealthCheckIP (to globalIP) in datastoreSyncer at a later stage. This will trigger // the HealthCheck between the clusters. - cniIface, err := cni.Discover(submSpec.ClusterCidr) - if err != nil { - return nil, fmt.Errorf("error getting CNI Interface IP address: %w."+ - "Please disable the health check if your CNI does not expose a pod IP on the nodes", err) + healthcheckIPs, err := getHealthCheckIPs(submSpec) + if len(healthcheckIPs) == 0 { + return nil, errors.Wrap(err, "could not determine healthcheck IP") } - endpointSpec.HealthCheckIP = cniIface.IPAddress + for _, healthcheckIP := range healthcheckIPs { + endpointSpec.SetHealthCheckIP(healthcheckIP) + } } return endpointSpec, nil } +func getHealthCheckIPs(submSpec *types.SubmarinerSpecification) ([]string, error) { + var healthCheckIPs []string + var errs []error + + for _, family := range submSpec.GetIPFamilies() { + if family == k8snet.IPv4 { + cniIface, err := cni.Discover(submSpec.ClusterCidr) + if err != nil { + errs = append(errs, fmt.Errorf("error getting CNI Interface IP address: %w."+ + "Please disable the health check if your CNI does not expose a pod IP on the nodes", err)) + } else { + healthCheckIPs = append(healthCheckIPs, cniIface.IPAddress) + } + } // TODO_IPV6: add V6 healthcheck IP + } + + return healthCheckIPs, errors.Wrap(k8serrors.NewAggregate(errs), "getHealthCheckIPs failed") +} + func getBackendConfig(nodeObj *v1.Node) (map[string]string, error) { backendConfig, err := getNodeBackendConfig(nodeObj) if err != nil { diff --git a/pkg/endpoint/local_ip.go b/pkg/endpoint/local_ip.go index 5e587777b..6124cc98b 100644 --- a/pkg/endpoint/local_ip.go +++ b/pkg/endpoint/local_ip.go @@ -20,6 +20,8 @@ package endpoint import ( "net" + + k8snet "k8s.io/utils/net" ) func GetLocalIPForDestination(dst string) string { @@ -37,3 +39,15 @@ func GetLocalIPForDestination(dst string) string { func GetLocalIP() string { return GetLocalIPForDestination("8.8.8.8") } + +func GetLocalIPs(ipFamilies [2]k8snet.IPFamily) []string { + var addresses []string + + for _, family := range ipFamilies { + if family == k8snet.IPv4 { + addresses = append(addresses, GetLocalIP()) + } // TODO_IPV6: add GetLocalIPV6 + } + + return addresses +} diff --git a/pkg/endpoint/public_ip.go b/pkg/endpoint/public_ip.go index 569a0bc70..c868cecf2 100644 --- a/pkg/endpoint/public_ip.go +++ b/pkg/endpoint/public_ip.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" + k8snet "k8s.io/utils/net" ) type publicIPResolverFunction func(clientset kubernetes.Interface, namespace, value string) (string, error) @@ -63,6 +64,27 @@ func getPublicIPResolvers() string { return strings.Join(serverList, ",") } +func getPublicIPs(submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface, + backendConfig map[string]string, airGapped bool, +) ([]string, error) { + var publicIPs []string + var errs []error + + for _, family := range submSpec.GetIPFamilies() { + if family == k8snet.IPv4 { + publicIP, resolver, err := getPublicIP(submSpec, k8sClient, backendConfig, airGapped) + if err != nil { + errs = append(errs, errors.Wrap(err, "could not determine public IP")) + } else { + logger.Infof("Obtained local endpoint public IP %q using resolver %q", publicIP, resolver) + publicIPs = append(publicIPs, publicIP) + } + } // TODO_IPV6: add getPublicIPV6 + } + + return publicIPs, errors.Wrap(k8serrors.NewAggregate(errs), "getPublicIPs failed") +} + func getPublicIP(submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface, backendConfig map[string]string, airGapped bool, ) (string, string, error) { diff --git a/pkg/endpoint/public_ip_watcher.go b/pkg/endpoint/public_ip_watcher.go index 7ff5b001f..65aeaf144 100644 --- a/pkg/endpoint/public_ip_watcher.go +++ b/pkg/endpoint/public_ip_watcher.go @@ -27,6 +27,7 @@ import ( "github.com/submariner-io/submariner/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + k8snet "k8s.io/utils/net" ) type PublicIPWatcherConfig struct { @@ -63,28 +64,44 @@ func (p *PublicIPWatcher) Run(stopCh <-chan struct{}) { } func (p *PublicIPWatcher) syncPublicIP() { + var publicIPs []string localEndpointSpec := p.config.LocalEndpoint.Spec() - publicIP, resolver, err := getPublicIP(p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false) - if err != nil { - logger.Warningf("Could not determine public IP of the gateway node %q: %v", localEndpointSpec.Hostname, err) - return + publicIPsChanged := false + + for _, family := range p.config.SubmSpec.GetIPFamilies() { + if family == k8snet.IPv4 { + publicIP, resolver, err := getPublicIP(p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false) + if err != nil { + logger.Warningf("Could not determine public IPV4 of the gateway node %q: %v", localEndpointSpec.Hostname, err) + } else { + if publicIP != localEndpointSpec.GetPublicIP(k8snet.IPv4) { + publicIPsChanged = true + + logger.Infof("Public IPv4 changed for the Gateway, updating the local endpoint with public IP %q obtained from resolver %q", + publicIP, resolver) + } + + publicIPs = append(publicIPs, publicIP) + } + } // TODO_IPV6: add getPublicIPV6 } - if localEndpointSpec.PublicIP != publicIP { - logger.Infof("Public IP changed for the Gateway, updating the local endpoint with public IP %q obtained from resolver %q", - publicIP, resolver) + if publicIPsChanged { + logger.Infof("Public IPs changed for the Gateway, updating the local endpoint with public IPs %q", publicIPs) - if err := p.updateLocalEndpoint(publicIP); err != nil { + if err := p.updateLocalEndpoint(publicIPs); err != nil { logger.Error(err, "Error updating the public IP for local endpoint") return } } } -func (p *PublicIPWatcher) updateLocalEndpoint(publicIP string) error { +func (p *PublicIPWatcher) updateLocalEndpoint(publicIPs []string) error { err := p.config.LocalEndpoint.Update(context.TODO(), func(existing *submv1.EndpointSpec) { - existing.PublicIP = publicIP + for _, publicIP := range publicIPs { + existing.SetPublicIP(publicIP) + } }) return errors.Wrap(err, "error updating the public IP of the local endpoint") diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index a63e391b0..4abd42fc2 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -435,8 +435,9 @@ func submarinerClusterFrom(submSpec *types.SubmarinerSpecification) *types.Subma return &types.SubmarinerCluster{ ID: submSpec.ClusterID, Spec: subv1.ClusterSpec{ - ClusterID: submSpec.ClusterID, - ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes + ClusterID: submSpec.ClusterID, + ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes + // TODO_IPV6: delete cidr.ExtractIPv4Subnets ServiceCIDR: cidr.ExtractIPv4Subnets(submSpec.ServiceCidr), ClusterCIDR: cidr.ExtractIPv4Subnets(submSpec.ClusterCidr), GlobalCIDR: globalCIDR, diff --git a/pkg/types/types.go b/pkg/types/types.go index 6f5c8f3c9..528e3cbcc 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -20,6 +20,7 @@ package types import ( subv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + k8snet "k8s.io/utils/net" ) type SubmarinerCluster struct { @@ -50,3 +51,11 @@ type SubmarinerSpecification struct { HealthCheckMaxPacketLossCount int MetricsPort int `default:"32780"` } + +func (subSpec *SubmarinerSpecification) GetIPFamilies() [2]k8snet.IPFamily { + var ipFamilies [2]k8snet.IPFamily + // TODO_IPV6: set ipFamilies according to ClusterCidr content + ipFamilies[0] = k8snet.IPv4 + + return ipFamilies +}