From 4434394ea2d61f0e9dbcf96ca90f258e9bc0356a Mon Sep 17 00:00:00 2001 From: Yossi Boaron Date: Sun, 19 Jan 2025 10:35:12 +0200 Subject: [PATCH] Gateway - use endpoint set private, public and healthcheck IPs This PR : - updates gateway code to use endpoint SetHealthCheckIP ,SetPrivateIP and SetPublicIP. - add placeholders for IPV6 public,private and healthcheck IPs. Signed-off-by: Yossi Boaron --- pkg/endpoint/local_endpoint.go | 53 +++++++++++++----- pkg/endpoint/local_endpoint_test.go | 3 +- pkg/endpoint/local_ip.go | 14 ++++- pkg/endpoint/public_ip.go | 72 ++++++++++++++----------- pkg/endpoint/public_ip_internal_test.go | 19 +++---- pkg/endpoint/public_ip_watcher.go | 28 ++++++---- pkg/gateway/gateway.go | 5 +- pkg/types/types.go | 9 ++++ 8 files changed, 133 insertions(+), 70 deletions(-) diff --git a/pkg/endpoint/local_endpoint.go b/pkg/endpoint/local_endpoint.go index 32a1cb8ae..9e01aa833 100644 --- a/pkg/endpoint/local_endpoint.go +++ b/pkg/endpoint/local_endpoint.go @@ -42,6 +42,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + k8snet "k8s.io/utils/net" "k8s.io/utils/set" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -136,9 +137,6 @@ func (l *Local) Create(ctx context.Context) error { func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface, airGappedDeployment bool, ) (*submv1.EndpointSpec, error) { - // We'll panic if submSpec is nil, this is intentional - privateIP := GetLocalIP() - gwNode, err := node.GetLocalNode(ctx, k8sClient) if err != nil { return nil, errors.Wrap(err, "getting information on the local node") @@ -157,6 +155,7 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification, localSubnets = submSpec.GlobalCidr globalnetEnabled = true } else { + // TODO_IPV6: set localSubnets to submSpec.ServiceCidr + submSpec.ClusterCidr localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ServiceCidr)...) localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ClusterCidr)...) } @@ -171,40 +170,66 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification, } endpointSpec := &submv1.EndpointSpec{ - CableName: fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID, strings.ReplaceAll(privateIP, ".", "-")), ClusterID: submSpec.ClusterID, Hostname: hostname, - PrivateIP: privateIP, NATEnabled: submSpec.NATEnabled, Subnets: localSubnets, Backend: submSpec.CableDriver, BackendConfig: backendConfig, } - publicIP, resolver, err := getPublicIP(submSpec, k8sClient, backendConfig, airGappedDeployment) - if err != nil { - return nil, errors.Wrap(err, "could not determine public IP") + for _, family := range submSpec.GetIPFamilies() { + endpointSpec.SetPrivateIP(GetLocalIP(family)) } - logger.Infof("Obtained local endpoint public IP %q using resolver %q", publicIP, resolver) + endpointSpec.CableName = fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID, + strings.ReplaceAll(endpointSpec.GetPrivateIP(k8snet.IPv4), ".", "-")) + + for _, family := range submSpec.GetIPFamilies() { + publicIP, resolver, err := getPublicIP(family, submSpec, k8sClient, backendConfig, airGappedDeployment) + if err != nil { + return nil, errors.Wrapf(err, "could not determine public IP%v", family) + } - endpointSpec.PublicIP = publicIP + logger.Infof("Obtained local endpoint public IP%v %q using resolver %q", family, publicIP, resolver) + endpointSpec.SetPublicIP(publicIP) + } if submSpec.HealthCheckEnabled && !globalnetEnabled { // When globalnet is enabled, HealthCheckIP will be the globalIP assigned to the Active GatewayNode. // In a fresh deployment, globalIP annotation for the node might take few seconds. So we listen on NodeEvents // and update the endpoint HealthCheckIP (to globalIP) in datastoreSyncer at a later stage. This will trigger // the HealthCheck between the clusters. + for _, family := range submSpec.GetIPFamilies() { + healthcheckIP, err := getHealthCheckIP(family, submSpec) + if err != nil { + return nil, fmt.Errorf("error getting HealthCheckIP%v: %w", family, err) + } + + endpointSpec.SetHealthCheckIP(healthcheckIP) + } + } + + return endpointSpec, nil +} + +func getHealthCheckIP(family k8snet.IPFamily, submSpec *types.SubmarinerSpecification) (string, error) { + switch family { + case k8snet.IPv4: cniIface, err := cni.Discover(submSpec.ClusterCidr) if err != nil { - return nil, fmt.Errorf("error getting CNI Interface IP address: %w."+ - "Please disable the health check if your CNI does not expose a pod IP on the nodes", err) + return "", errors.Wrapf(err, "error getting CNI Interface IP address."+ + "Please disable the health check if your CNI does not expose a pod IP on the nodes") } - endpointSpec.HealthCheckIP = cniIface.IPAddress + return cniIface.IPAddress, nil + case k8snet.IPv6: + // TODO_IPV6: add V6 healthcheck IP + + case k8snet.IPFamilyUnknown: } - return endpointSpec, nil + return "", nil } func getBackendConfig(nodeObj *v1.Node) (map[string]string, error) { diff --git a/pkg/endpoint/local_endpoint_test.go b/pkg/endpoint/local_endpoint_test.go index ba80d3c2d..166af0913 100644 --- a/pkg/endpoint/local_endpoint_test.go +++ b/pkg/endpoint/local_endpoint_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + k8snet "k8s.io/utils/net" ) const testNodeName = "this-node" @@ -46,7 +47,7 @@ var _ = Describe("GetLocalSpec", func() { node *v1.Node ) - testPrivateIP := endpoint.GetLocalIP() + testPrivateIP := endpoint.GetLocalIP(k8snet.IPv4) const ( testIPv4Label = "ipv4:" diff --git a/pkg/endpoint/local_ip.go b/pkg/endpoint/local_ip.go index 5e587777b..95ff56355 100644 --- a/pkg/endpoint/local_ip.go +++ b/pkg/endpoint/local_ip.go @@ -20,6 +20,8 @@ package endpoint import ( "net" + + k8snet "k8s.io/utils/net" ) func GetLocalIPForDestination(dst string) string { @@ -34,6 +36,14 @@ func GetLocalIPForDestination(dst string) string { return localAddr.IP.String() } -func GetLocalIP() string { - return GetLocalIPForDestination("8.8.8.8") +func GetLocalIP(family k8snet.IPFamily) string { + switch family { + case k8snet.IPv4: + return GetLocalIPForDestination("8.8.8.8") + case k8snet.IPv6: + // TODO_IPV6: add V6 healthcheck IP + case k8snet.IPFamilyUnknown: + } + + return "" } diff --git a/pkg/endpoint/public_ip.go b/pkg/endpoint/public_ip.go index 569a0bc70..e09972937 100644 --- a/pkg/endpoint/public_ip.go +++ b/pkg/endpoint/public_ip.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" + k8snet "k8s.io/utils/net" ) type publicIPResolverFunction func(clientset kubernetes.Interface, namespace, value string) (string, error) @@ -63,51 +64,58 @@ func getPublicIPResolvers() string { return strings.Join(serverList, ",") } -func getPublicIP(submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface, +func getPublicIP(family k8snet.IPFamily, submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface, backendConfig map[string]string, airGapped bool, ) (string, string, error) { - // If the node is annotated with a public-ip, the same is used as the public-ip of local endpoint. - config, ok := backendConfig[v1.PublicIP] - if !ok { - if submSpec.PublicIP != "" { - config = submSpec.PublicIP - } else { - config = getPublicIPResolvers() + switch family { + case k8snet.IPv4: + // If the node is annotated with a public-ip, the same is used as the public-ip of local endpoint. + config, ok := backendConfig[v1.PublicIP] + if !ok { + if submSpec.PublicIP != "" { + config = submSpec.PublicIP + } else { + config = getPublicIPResolvers() + } } - } - if airGapped { - ip, resolver, err := resolveIPInAirGappedDeployment(k8sClient, submSpec.Namespace, config) - if err != nil { - logger.Errorf(err, "Error resolving public IP in an air-gapped deployment, using empty value: %s", config) - return "", "", nil + if airGapped { + ip, resolver, err := resolveIPInAirGappedDeployment(k8sClient, submSpec.Namespace, config) + if err != nil { + logger.Errorf(err, "Error resolving public IP%s in an air-gapped deployment, using empty value: %s", family, config) + return "", "", nil + } + + return ip, resolver, nil } - return ip, resolver, nil - } + resolvers := strings.Split(config, ",") + errs := make([]error, 0, len(resolvers)) - resolvers := strings.Split(config, ",") - errs := make([]error, 0, len(resolvers)) + for _, resolver := range resolvers { + resolver = strings.Trim(resolver, " ") - for _, resolver := range resolvers { - resolver = strings.Trim(resolver, " ") + parts := strings.Split(resolver, ":") + if len(parts) != 2 { + return "", "", errors.Errorf("invalid format for %q annotation: %q", v1.GatewayConfigPrefix+v1.PublicIP, config) + } - parts := strings.Split(resolver, ":") - if len(parts) != 2 { - return "", "", errors.Errorf("invalid format for %q annotation: %q", v1.GatewayConfigPrefix+v1.PublicIP, config) - } + ip, err := resolvePublicIP(k8sClient, submSpec.Namespace, parts) + if err == nil { + return ip, resolver, nil + } - ip, err := resolvePublicIP(k8sClient, submSpec.Namespace, parts) - if err == nil { - return ip, resolver, nil + // If this resolver failed, we log it, but we fall back to the next one + errs = append(errs, errors.Wrapf(err, "\nResolver[%q]", resolver)) } - // If this resolver failed, we log it, but we fall back to the next one - errs = append(errs, errors.Wrapf(err, "\nResolver[%q]", resolver)) - } + if len(resolvers) > 0 { + return "", "", errors.Wrapf(k8serrors.NewAggregate(errs), "Unable to resolve public IP by any of the resolver methods") + } - if len(resolvers) > 0 { - return "", "", errors.Wrapf(k8serrors.NewAggregate(errs), "Unable to resolve public IP by any of the resolver methods") + case k8snet.IPv6: + // TODO_IPV6: add V6 healthcheck IP + case k8snet.IPFamilyUnknown: } return "", "", nil diff --git a/pkg/endpoint/public_ip_internal_test.go b/pkg/endpoint/public_ip_internal_test.go index 85c800eb6..74478420e 100644 --- a/pkg/endpoint/public_ip_internal_test.go +++ b/pkg/endpoint/public_ip_internal_test.go @@ -28,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" v1meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + k8snet "k8s.io/utils/net" ) var _ = Describe("firstIPv4InString", func() { @@ -81,7 +82,7 @@ var _ = Describe("public ip resolvers", func() { It("should return the IP", func() { backendConfig[publicIPConfig] = lbPublicIP client := fake.NewClientset(serviceWithIngress(v1.LoadBalancerIngress{Hostname: "", IP: testIP})) - ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false) + ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false) Expect(err).ToNot(HaveOccurred()) Expect(ip).To(Equal(testIP)) Expect(resolver).To(Equal(lbPublicIP)) @@ -95,7 +96,7 @@ var _ = Describe("public ip resolvers", func() { Hostname: dnsHost, IP: "", })) - ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false) + ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false) Expect(err).ToNot(HaveOccurred()) Expect(ip).To(Equal(testIPDNS)) Expect(resolver).To(Equal(lbPublicIP)) @@ -109,7 +110,7 @@ var _ = Describe("public ip resolvers", func() { loadBalancerRetryConfig.Steps = 1 backendConfig[publicIPConfig] = lbPublicIP client := fake.NewClientset(serviceWithIngress()) - _, _, err := getPublicIP(submSpec, client, backendConfig, false) + _, _, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false) Expect(err).To(HaveOccurred()) }) }) @@ -118,7 +119,7 @@ var _ = Describe("public ip resolvers", func() { It("should return the IP", func() { backendConfig[publicIPConfig] = ipv4PublicIP client := fake.NewClientset() - ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false) + ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false) Expect(err).ToNot(HaveOccurred()) Expect(ip).To(Equal(testIP)) Expect(resolver).To(Equal(ipv4PublicIP)) @@ -129,7 +130,7 @@ var _ = Describe("public ip resolvers", func() { It("should return the IP and not an empty value", func() { backendConfig[publicIPConfig] = ipv4PublicIP client := fake.NewClientset() - ip, resolver, err := getPublicIP(submSpec, client, backendConfig, true) + ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, true) Expect(err).ToNot(HaveOccurred()) Expect(ip).To(Equal(testIP)) Expect(resolver).To(Equal(ipv4PublicIP)) @@ -140,7 +141,7 @@ var _ = Describe("public ip resolvers", func() { It("should return the IP", func() { backendConfig[publicIPConfig] = "dns:" + dnsHost client := fake.NewClientset() - ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false) + ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false) Expect(err).ToNot(HaveOccurred()) Expect(ip).To(Equal(testIPDNS)) Expect(resolver).To(Equal(backendConfig[publicIPConfig])) @@ -151,7 +152,7 @@ var _ = Describe("public ip resolvers", func() { It("should return some IP", func() { backendConfig[publicIPConfig] = "api:4.icanhazip.com/" client := fake.NewClientset() - ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false) + ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false) Expect(err).ToNot(HaveOccurred()) Expect(net.ParseIP(ip)).NotTo(BeNil()) Expect(resolver).To(Equal(backendConfig[publicIPConfig])) @@ -162,7 +163,7 @@ var _ = Describe("public ip resolvers", func() { It("should return the first working one", func() { backendConfig[publicIPConfig] = ipv4PublicIP + ",dns:" + dnsHost client := fake.NewClientset() - ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false) + ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false) Expect(err).ToNot(HaveOccurred()) Expect(ip).To(Equal(testIP)) Expect(resolver).To(Equal(ipv4PublicIP)) @@ -173,7 +174,7 @@ var _ = Describe("public ip resolvers", func() { It("should return the first working one", func() { backendConfig[publicIPConfig] = "dns:thisdomaindoesntexistforsure.badbadbad," + ipv4PublicIP client := fake.NewClientset() - ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false) + ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false) Expect(err).ToNot(HaveOccurred()) Expect(ip).To(Equal(testIP)) Expect(resolver).To(Equal(ipv4PublicIP)) diff --git a/pkg/endpoint/public_ip_watcher.go b/pkg/endpoint/public_ip_watcher.go index 7ff5b001f..7f4e9e81a 100644 --- a/pkg/endpoint/public_ip_watcher.go +++ b/pkg/endpoint/public_ip_watcher.go @@ -63,28 +63,36 @@ func (p *PublicIPWatcher) Run(stopCh <-chan struct{}) { } func (p *PublicIPWatcher) syncPublicIP() { + var publicIPs []string localEndpointSpec := p.config.LocalEndpoint.Spec() - publicIP, resolver, err := getPublicIP(p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false) - if err != nil { - logger.Warningf("Could not determine public IP of the gateway node %q: %v", localEndpointSpec.Hostname, err) - return + for _, family := range p.config.SubmSpec.GetIPFamilies() { + publicIP, _, err := getPublicIP(family, p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false) + if err != nil { + logger.Warningf("Could not determine public IP for family %v of the gateway node %q: %v", family, localEndpointSpec.Hostname, err) + continue + } + + if publicIP != localEndpointSpec.GetPublicIP(family) { + publicIPs = append(publicIPs, publicIP) + } } - if localEndpointSpec.PublicIP != publicIP { - logger.Infof("Public IP changed for the Gateway, updating the local endpoint with public IP %q obtained from resolver %q", - publicIP, resolver) + if len(publicIPs) > 0 { + logger.Infof("Public IPs changed for the Gateway, updating the local endpoint with public IPs %q", publicIPs) - if err := p.updateLocalEndpoint(publicIP); err != nil { + if err := p.updateLocalEndpoint(publicIPs); err != nil { logger.Error(err, "Error updating the public IP for local endpoint") return } } } -func (p *PublicIPWatcher) updateLocalEndpoint(publicIP string) error { +func (p *PublicIPWatcher) updateLocalEndpoint(publicIPs []string) error { err := p.config.LocalEndpoint.Update(context.TODO(), func(existing *submv1.EndpointSpec) { - existing.PublicIP = publicIP + for _, publicIP := range publicIPs { + existing.SetPublicIP(publicIP) + } }) return errors.Wrap(err, "error updating the public IP of the local endpoint") diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index a63e391b0..4abd42fc2 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -435,8 +435,9 @@ func submarinerClusterFrom(submSpec *types.SubmarinerSpecification) *types.Subma return &types.SubmarinerCluster{ ID: submSpec.ClusterID, Spec: subv1.ClusterSpec{ - ClusterID: submSpec.ClusterID, - ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes + ClusterID: submSpec.ClusterID, + ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes + // TODO_IPV6: delete cidr.ExtractIPv4Subnets ServiceCIDR: cidr.ExtractIPv4Subnets(submSpec.ServiceCidr), ClusterCIDR: cidr.ExtractIPv4Subnets(submSpec.ClusterCidr), GlobalCIDR: globalCIDR, diff --git a/pkg/types/types.go b/pkg/types/types.go index 6f5c8f3c9..528e3cbcc 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -20,6 +20,7 @@ package types import ( subv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + k8snet "k8s.io/utils/net" ) type SubmarinerCluster struct { @@ -50,3 +51,11 @@ type SubmarinerSpecification struct { HealthCheckMaxPacketLossCount int MetricsPort int `default:"32780"` } + +func (subSpec *SubmarinerSpecification) GetIPFamilies() [2]k8snet.IPFamily { + var ipFamilies [2]k8snet.IPFamily + // TODO_IPV6: set ipFamilies according to ClusterCidr content + ipFamilies[0] = k8snet.IPv4 + + return ipFamilies +}