Skip to content

Commit

Permalink
Gateway - use endpoint set private, public and healthcheck IPs
Browse files Browse the repository at this point in the history
This PR :
- updates gateway code to use endpoint SetHealthCheckIP
,SetPrivateIP and SetPublicIP.
- add placeholders for IPV6 public,private and healthcheck IPs.

Signed-off-by: Yossi Boaron <[email protected]>
  • Loading branch information
yboaron committed Jan 22, 2025
1 parent 2c0e7b5 commit 4434394
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 70 deletions.
53 changes: 39 additions & 14 deletions pkg/endpoint/local_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
k8snet "k8s.io/utils/net"
"k8s.io/utils/set"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
Expand Down Expand Up @@ -136,9 +137,6 @@ func (l *Local) Create(ctx context.Context) error {
func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface,
airGappedDeployment bool,
) (*submv1.EndpointSpec, error) {
// We'll panic if submSpec is nil, this is intentional
privateIP := GetLocalIP()

gwNode, err := node.GetLocalNode(ctx, k8sClient)
if err != nil {
return nil, errors.Wrap(err, "getting information on the local node")
Expand All @@ -157,6 +155,7 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification,
localSubnets = submSpec.GlobalCidr
globalnetEnabled = true
} else {
// TODO_IPV6: set localSubnets to submSpec.ServiceCidr + submSpec.ClusterCidr
localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ServiceCidr)...)
localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ClusterCidr)...)
}
Expand All @@ -171,40 +170,66 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification,
}

endpointSpec := &submv1.EndpointSpec{
CableName: fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID, strings.ReplaceAll(privateIP, ".", "-")),
ClusterID: submSpec.ClusterID,
Hostname: hostname,
PrivateIP: privateIP,
NATEnabled: submSpec.NATEnabled,
Subnets: localSubnets,
Backend: submSpec.CableDriver,
BackendConfig: backendConfig,
}

publicIP, resolver, err := getPublicIP(submSpec, k8sClient, backendConfig, airGappedDeployment)
if err != nil {
return nil, errors.Wrap(err, "could not determine public IP")
for _, family := range submSpec.GetIPFamilies() {
endpointSpec.SetPrivateIP(GetLocalIP(family))
}

logger.Infof("Obtained local endpoint public IP %q using resolver %q", publicIP, resolver)
endpointSpec.CableName = fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID,
strings.ReplaceAll(endpointSpec.GetPrivateIP(k8snet.IPv4), ".", "-"))

for _, family := range submSpec.GetIPFamilies() {
publicIP, resolver, err := getPublicIP(family, submSpec, k8sClient, backendConfig, airGappedDeployment)
if err != nil {
return nil, errors.Wrapf(err, "could not determine public IP%v", family)
}

endpointSpec.PublicIP = publicIP
logger.Infof("Obtained local endpoint public IP%v %q using resolver %q", family, publicIP, resolver)
endpointSpec.SetPublicIP(publicIP)
}

if submSpec.HealthCheckEnabled && !globalnetEnabled {
// When globalnet is enabled, HealthCheckIP will be the globalIP assigned to the Active GatewayNode.
// In a fresh deployment, globalIP annotation for the node might take few seconds. So we listen on NodeEvents
// and update the endpoint HealthCheckIP (to globalIP) in datastoreSyncer at a later stage. This will trigger
// the HealthCheck between the clusters.
for _, family := range submSpec.GetIPFamilies() {
healthcheckIP, err := getHealthCheckIP(family, submSpec)
if err != nil {
return nil, fmt.Errorf("error getting HealthCheckIP%v: %w", family, err)
}

endpointSpec.SetHealthCheckIP(healthcheckIP)
}
}

return endpointSpec, nil
}

func getHealthCheckIP(family k8snet.IPFamily, submSpec *types.SubmarinerSpecification) (string, error) {
switch family {
case k8snet.IPv4:
cniIface, err := cni.Discover(submSpec.ClusterCidr)
if err != nil {
return nil, fmt.Errorf("error getting CNI Interface IP address: %w."+
"Please disable the health check if your CNI does not expose a pod IP on the nodes", err)
return "", errors.Wrapf(err, "error getting CNI Interface IP address."+
"Please disable the health check if your CNI does not expose a pod IP on the nodes")
}

endpointSpec.HealthCheckIP = cniIface.IPAddress
return cniIface.IPAddress, nil
case k8snet.IPv6:
// TODO_IPV6: add V6 healthcheck IP

case k8snet.IPFamilyUnknown:
}

return endpointSpec, nil
return "", nil
}

func getBackendConfig(nodeObj *v1.Node) (map[string]string, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/endpoint/local_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
k8snet "k8s.io/utils/net"
)

const testNodeName = "this-node"
Expand All @@ -46,7 +47,7 @@ var _ = Describe("GetLocalSpec", func() {
node *v1.Node
)

testPrivateIP := endpoint.GetLocalIP()
testPrivateIP := endpoint.GetLocalIP(k8snet.IPv4)

const (
testIPv4Label = "ipv4:"
Expand Down
14 changes: 12 additions & 2 deletions pkg/endpoint/local_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package endpoint

import (
"net"

k8snet "k8s.io/utils/net"
)

func GetLocalIPForDestination(dst string) string {
Expand All @@ -34,6 +36,14 @@ func GetLocalIPForDestination(dst string) string {
return localAddr.IP.String()
}

func GetLocalIP() string {
return GetLocalIPForDestination("8.8.8.8")
func GetLocalIP(family k8snet.IPFamily) string {
switch family {
case k8snet.IPv4:
return GetLocalIPForDestination("8.8.8.8")
case k8snet.IPv6:
// TODO_IPV6: add V6 healthcheck IP
case k8snet.IPFamilyUnknown:
}

return ""
}
72 changes: 40 additions & 32 deletions pkg/endpoint/public_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
k8snet "k8s.io/utils/net"
)

type publicIPResolverFunction func(clientset kubernetes.Interface, namespace, value string) (string, error)
Expand All @@ -63,51 +64,58 @@ func getPublicIPResolvers() string {
return strings.Join(serverList, ",")
}

func getPublicIP(submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface,
func getPublicIP(family k8snet.IPFamily, submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface,
backendConfig map[string]string, airGapped bool,
) (string, string, error) {
// If the node is annotated with a public-ip, the same is used as the public-ip of local endpoint.
config, ok := backendConfig[v1.PublicIP]
if !ok {
if submSpec.PublicIP != "" {
config = submSpec.PublicIP
} else {
config = getPublicIPResolvers()
switch family {
case k8snet.IPv4:
// If the node is annotated with a public-ip, the same is used as the public-ip of local endpoint.
config, ok := backendConfig[v1.PublicIP]
if !ok {
if submSpec.PublicIP != "" {
config = submSpec.PublicIP
} else {
config = getPublicIPResolvers()
}
}
}

if airGapped {
ip, resolver, err := resolveIPInAirGappedDeployment(k8sClient, submSpec.Namespace, config)
if err != nil {
logger.Errorf(err, "Error resolving public IP in an air-gapped deployment, using empty value: %s", config)
return "", "", nil
if airGapped {
ip, resolver, err := resolveIPInAirGappedDeployment(k8sClient, submSpec.Namespace, config)
if err != nil {
logger.Errorf(err, "Error resolving public IP%s in an air-gapped deployment, using empty value: %s", family, config)
return "", "", nil
}

return ip, resolver, nil
}

return ip, resolver, nil
}
resolvers := strings.Split(config, ",")
errs := make([]error, 0, len(resolvers))

resolvers := strings.Split(config, ",")
errs := make([]error, 0, len(resolvers))
for _, resolver := range resolvers {
resolver = strings.Trim(resolver, " ")

for _, resolver := range resolvers {
resolver = strings.Trim(resolver, " ")
parts := strings.Split(resolver, ":")
if len(parts) != 2 {
return "", "", errors.Errorf("invalid format for %q annotation: %q", v1.GatewayConfigPrefix+v1.PublicIP, config)
}

parts := strings.Split(resolver, ":")
if len(parts) != 2 {
return "", "", errors.Errorf("invalid format for %q annotation: %q", v1.GatewayConfigPrefix+v1.PublicIP, config)
}
ip, err := resolvePublicIP(k8sClient, submSpec.Namespace, parts)
if err == nil {
return ip, resolver, nil
}

ip, err := resolvePublicIP(k8sClient, submSpec.Namespace, parts)
if err == nil {
return ip, resolver, nil
// If this resolver failed, we log it, but we fall back to the next one
errs = append(errs, errors.Wrapf(err, "\nResolver[%q]", resolver))
}

// If this resolver failed, we log it, but we fall back to the next one
errs = append(errs, errors.Wrapf(err, "\nResolver[%q]", resolver))
}
if len(resolvers) > 0 {
return "", "", errors.Wrapf(k8serrors.NewAggregate(errs), "Unable to resolve public IP by any of the resolver methods")
}

if len(resolvers) > 0 {
return "", "", errors.Wrapf(k8serrors.NewAggregate(errs), "Unable to resolve public IP by any of the resolver methods")
case k8snet.IPv6:
// TODO_IPV6: add V6 healthcheck IP
case k8snet.IPFamilyUnknown:
}

return "", "", nil
Expand Down
19 changes: 10 additions & 9 deletions pkg/endpoint/public_ip_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1"
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
k8snet "k8s.io/utils/net"
)

var _ = Describe("firstIPv4InString", func() {
Expand Down Expand Up @@ -81,7 +82,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the IP", func() {
backendConfig[publicIPConfig] = lbPublicIP
client := fake.NewClientset(serviceWithIngress(v1.LoadBalancerIngress{Hostname: "", IP: testIP}))
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(lbPublicIP))
Expand All @@ -95,7 +96,7 @@ var _ = Describe("public ip resolvers", func() {
Hostname: dnsHost,
IP: "",
}))
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIPDNS))
Expect(resolver).To(Equal(lbPublicIP))
Expand All @@ -109,7 +110,7 @@ var _ = Describe("public ip resolvers", func() {
loadBalancerRetryConfig.Steps = 1
backendConfig[publicIPConfig] = lbPublicIP
client := fake.NewClientset(serviceWithIngress())
_, _, err := getPublicIP(submSpec, client, backendConfig, false)
_, _, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).To(HaveOccurred())
})
})
Expand All @@ -118,7 +119,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the IP", func() {
backendConfig[publicIPConfig] = ipv4PublicIP
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(ipv4PublicIP))
Expand All @@ -129,7 +130,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the IP and not an empty value", func() {
backendConfig[publicIPConfig] = ipv4PublicIP
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, true)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, true)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(ipv4PublicIP))
Expand All @@ -140,7 +141,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the IP", func() {
backendConfig[publicIPConfig] = "dns:" + dnsHost
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIPDNS))
Expect(resolver).To(Equal(backendConfig[publicIPConfig]))
Expand All @@ -151,7 +152,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return some IP", func() {
backendConfig[publicIPConfig] = "api:4.icanhazip.com/"
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(net.ParseIP(ip)).NotTo(BeNil())
Expect(resolver).To(Equal(backendConfig[publicIPConfig]))
Expand All @@ -162,7 +163,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the first working one", func() {
backendConfig[publicIPConfig] = ipv4PublicIP + ",dns:" + dnsHost
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(ipv4PublicIP))
Expand All @@ -173,7 +174,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the first working one", func() {
backendConfig[publicIPConfig] = "dns:thisdomaindoesntexistforsure.badbadbad," + ipv4PublicIP
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(ipv4PublicIP))
Expand Down
28 changes: 18 additions & 10 deletions pkg/endpoint/public_ip_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,36 @@ func (p *PublicIPWatcher) Run(stopCh <-chan struct{}) {
}

func (p *PublicIPWatcher) syncPublicIP() {
var publicIPs []string
localEndpointSpec := p.config.LocalEndpoint.Spec()

publicIP, resolver, err := getPublicIP(p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false)
if err != nil {
logger.Warningf("Could not determine public IP of the gateway node %q: %v", localEndpointSpec.Hostname, err)
return
for _, family := range p.config.SubmSpec.GetIPFamilies() {
publicIP, _, err := getPublicIP(family, p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false)
if err != nil {
logger.Warningf("Could not determine public IP for family %v of the gateway node %q: %v", family, localEndpointSpec.Hostname, err)
continue
}

if publicIP != localEndpointSpec.GetPublicIP(family) {
publicIPs = append(publicIPs, publicIP)
}
}

if localEndpointSpec.PublicIP != publicIP {
logger.Infof("Public IP changed for the Gateway, updating the local endpoint with public IP %q obtained from resolver %q",
publicIP, resolver)
if len(publicIPs) > 0 {
logger.Infof("Public IPs changed for the Gateway, updating the local endpoint with public IPs %q", publicIPs)

if err := p.updateLocalEndpoint(publicIP); err != nil {
if err := p.updateLocalEndpoint(publicIPs); err != nil {
logger.Error(err, "Error updating the public IP for local endpoint")
return
}
}
}

func (p *PublicIPWatcher) updateLocalEndpoint(publicIP string) error {
func (p *PublicIPWatcher) updateLocalEndpoint(publicIPs []string) error {
err := p.config.LocalEndpoint.Update(context.TODO(), func(existing *submv1.EndpointSpec) {
existing.PublicIP = publicIP
for _, publicIP := range publicIPs {
existing.SetPublicIP(publicIP)
}
})

return errors.Wrap(err, "error updating the public IP of the local endpoint")
Expand Down
5 changes: 3 additions & 2 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,9 @@ func submarinerClusterFrom(submSpec *types.SubmarinerSpecification) *types.Subma
return &types.SubmarinerCluster{
ID: submSpec.ClusterID,
Spec: subv1.ClusterSpec{
ClusterID: submSpec.ClusterID,
ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes
ClusterID: submSpec.ClusterID,
ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes
// TODO_IPV6: delete cidr.ExtractIPv4Subnets
ServiceCIDR: cidr.ExtractIPv4Subnets(submSpec.ServiceCidr),
ClusterCIDR: cidr.ExtractIPv4Subnets(submSpec.ClusterCidr),
GlobalCIDR: globalCIDR,
Expand Down
Loading

0 comments on commit 4434394

Please sign in to comment.