Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gateway - use endpoint set private, public and healthcheck IPs #3279

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions pkg/endpoint/local_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
k8snet "k8s.io/utils/net"
"k8s.io/utils/set"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
Expand Down Expand Up @@ -136,9 +137,6 @@ func (l *Local) Create(ctx context.Context) error {
func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface,
airGappedDeployment bool,
) (*submv1.EndpointSpec, error) {
// We'll panic if submSpec is nil, this is intentional
privateIP := GetLocalIP()

gwNode, err := node.GetLocalNode(ctx, k8sClient)
if err != nil {
return nil, errors.Wrap(err, "getting information on the local node")
Expand All @@ -157,6 +155,7 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification,
localSubnets = submSpec.GlobalCidr
globalnetEnabled = true
} else {
// TODO_IPV6: set localSubnets to submSpec.ServiceCidr + submSpec.ClusterCidr
localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ServiceCidr)...)
localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ClusterCidr)...)
}
Expand All @@ -171,40 +170,66 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification,
}

endpointSpec := &submv1.EndpointSpec{
CableName: fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID, strings.ReplaceAll(privateIP, ".", "-")),
ClusterID: submSpec.ClusterID,
Hostname: hostname,
PrivateIP: privateIP,
NATEnabled: submSpec.NATEnabled,
Subnets: localSubnets,
Backend: submSpec.CableDriver,
BackendConfig: backendConfig,
}

publicIP, resolver, err := getPublicIP(submSpec, k8sClient, backendConfig, airGappedDeployment)
if err != nil {
return nil, errors.Wrap(err, "could not determine public IP")
for _, family := range submSpec.GetIPFamilies() {
endpointSpec.SetPrivateIP(GetLocalIP(family))
}

logger.Infof("Obtained local endpoint public IP %q using resolver %q", publicIP, resolver)
endpointSpec.CableName = fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID,
strings.ReplaceAll(endpointSpec.GetPrivateIP(k8snet.IPv4), ".", "-"))
yboaron marked this conversation as resolved.
Show resolved Hide resolved

for _, family := range submSpec.GetIPFamilies() {
publicIP, resolver, err := getPublicIP(family, submSpec, k8sClient, backendConfig, airGappedDeployment)
if err != nil {
return nil, errors.Wrapf(err, "could not determine public IP%v", family)
}

endpointSpec.PublicIP = publicIP
logger.Infof("Obtained local endpoint public IP%v %q using resolver %q", family, publicIP, resolver)
endpointSpec.SetPublicIP(publicIP)
}

if submSpec.HealthCheckEnabled && !globalnetEnabled {
// When globalnet is enabled, HealthCheckIP will be the globalIP assigned to the Active GatewayNode.
// In a fresh deployment, globalIP annotation for the node might take few seconds. So we listen on NodeEvents
// and update the endpoint HealthCheckIP (to globalIP) in datastoreSyncer at a later stage. This will trigger
// the HealthCheck between the clusters.
for _, family := range submSpec.GetIPFamilies() {
healthcheckIP, err := getHealthCheckIP(family, submSpec)
if err != nil {
return nil, fmt.Errorf("error getting HealthCheckIP%v: %w", family, err)
}

endpointSpec.SetHealthCheckIP(healthcheckIP)
}
}

return endpointSpec, nil
}

func getHealthCheckIP(family k8snet.IPFamily, submSpec *types.SubmarinerSpecification) (string, error) {
switch family {
case k8snet.IPv4:
cniIface, err := cni.Discover(submSpec.ClusterCidr)
if err != nil {
return nil, fmt.Errorf("error getting CNI Interface IP address: %w."+
"Please disable the health check if your CNI does not expose a pod IP on the nodes", err)
return "", errors.Wrapf(err, "error getting CNI Interface IP address."+
"Please disable the health check if your CNI does not expose a pod IP on the nodes")
}

endpointSpec.HealthCheckIP = cniIface.IPAddress
return cniIface.IPAddress, nil
case k8snet.IPv6:
// TODO_IPV6: add V6 healthcheck IP

case k8snet.IPFamilyUnknown:
}

return endpointSpec, nil
return "", nil
}

func getBackendConfig(nodeObj *v1.Node) (map[string]string, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/endpoint/local_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
k8snet "k8s.io/utils/net"
)

const testNodeName = "this-node"
Expand All @@ -46,7 +47,7 @@ var _ = Describe("GetLocalSpec", func() {
node *v1.Node
)

testPrivateIP := endpoint.GetLocalIP()
testPrivateIP := endpoint.GetLocalIP(k8snet.IPv4)

const (
testIPv4Label = "ipv4:"
Expand Down
14 changes: 12 additions & 2 deletions pkg/endpoint/local_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package endpoint

import (
"net"

k8snet "k8s.io/utils/net"
)

func GetLocalIPForDestination(dst string) string {
Expand All @@ -34,6 +36,14 @@ func GetLocalIPForDestination(dst string) string {
return localAddr.IP.String()
}

func GetLocalIP() string {
return GetLocalIPForDestination("8.8.8.8")
func GetLocalIP(family k8snet.IPFamily) string {
switch family {
case k8snet.IPv4:
return GetLocalIPForDestination("8.8.8.8")
case k8snet.IPv6:
// TODO_IPV6: add V6 healthcheck IP
case k8snet.IPFamilyUnknown:
}

return ""
}
72 changes: 40 additions & 32 deletions pkg/endpoint/public_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
k8snet "k8s.io/utils/net"
)

type publicIPResolverFunction func(clientset kubernetes.Interface, namespace, value string) (string, error)
Expand All @@ -63,51 +64,58 @@ func getPublicIPResolvers() string {
return strings.Join(serverList, ",")
}

func getPublicIP(submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface,
func getPublicIP(family k8snet.IPFamily, submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface,
backendConfig map[string]string, airGapped bool,
) (string, string, error) {
// If the node is annotated with a public-ip, the same is used as the public-ip of local endpoint.
config, ok := backendConfig[v1.PublicIP]
if !ok {
if submSpec.PublicIP != "" {
config = submSpec.PublicIP
} else {
config = getPublicIPResolvers()
switch family {
case k8snet.IPv4:
// If the node is annotated with a public-ip, the same is used as the public-ip of local endpoint.
config, ok := backendConfig[v1.PublicIP]
if !ok {
if submSpec.PublicIP != "" {
config = submSpec.PublicIP
} else {
config = getPublicIPResolvers()
}
}
}

if airGapped {
ip, resolver, err := resolveIPInAirGappedDeployment(k8sClient, submSpec.Namespace, config)
if err != nil {
logger.Errorf(err, "Error resolving public IP in an air-gapped deployment, using empty value: %s", config)
return "", "", nil
if airGapped {
ip, resolver, err := resolveIPInAirGappedDeployment(k8sClient, submSpec.Namespace, config)
if err != nil {
logger.Errorf(err, "Error resolving public IP%s in an air-gapped deployment, using empty value: %s", family, config)
return "", "", nil
}

return ip, resolver, nil
}

return ip, resolver, nil
}
resolvers := strings.Split(config, ",")
errs := make([]error, 0, len(resolvers))

resolvers := strings.Split(config, ",")
errs := make([]error, 0, len(resolvers))
for _, resolver := range resolvers {
resolver = strings.Trim(resolver, " ")

for _, resolver := range resolvers {
resolver = strings.Trim(resolver, " ")
parts := strings.Split(resolver, ":")
if len(parts) != 2 {
return "", "", errors.Errorf("invalid format for %q annotation: %q", v1.GatewayConfigPrefix+v1.PublicIP, config)
}

parts := strings.Split(resolver, ":")
if len(parts) != 2 {
return "", "", errors.Errorf("invalid format for %q annotation: %q", v1.GatewayConfigPrefix+v1.PublicIP, config)
}
ip, err := resolvePublicIP(k8sClient, submSpec.Namespace, parts)
if err == nil {
return ip, resolver, nil
}

ip, err := resolvePublicIP(k8sClient, submSpec.Namespace, parts)
if err == nil {
return ip, resolver, nil
// If this resolver failed, we log it, but we fall back to the next one
errs = append(errs, errors.Wrapf(err, "\nResolver[%q]", resolver))
}

// If this resolver failed, we log it, but we fall back to the next one
errs = append(errs, errors.Wrapf(err, "\nResolver[%q]", resolver))
}
if len(resolvers) > 0 {
return "", "", errors.Wrapf(k8serrors.NewAggregate(errs), "Unable to resolve public IP by any of the resolver methods")
}

if len(resolvers) > 0 {
return "", "", errors.Wrapf(k8serrors.NewAggregate(errs), "Unable to resolve public IP by any of the resolver methods")
case k8snet.IPv6:
// TODO_IPV6: add V6 healthcheck IP
case k8snet.IPFamilyUnknown:
}

return "", "", nil
Expand Down
19 changes: 10 additions & 9 deletions pkg/endpoint/public_ip_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1"
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
k8snet "k8s.io/utils/net"
)

var _ = Describe("firstIPv4InString", func() {
Expand Down Expand Up @@ -81,7 +82,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the IP", func() {
backendConfig[publicIPConfig] = lbPublicIP
client := fake.NewClientset(serviceWithIngress(v1.LoadBalancerIngress{Hostname: "", IP: testIP}))
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(lbPublicIP))
Expand All @@ -95,7 +96,7 @@ var _ = Describe("public ip resolvers", func() {
Hostname: dnsHost,
IP: "",
}))
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIPDNS))
Expect(resolver).To(Equal(lbPublicIP))
Expand All @@ -109,7 +110,7 @@ var _ = Describe("public ip resolvers", func() {
loadBalancerRetryConfig.Steps = 1
backendConfig[publicIPConfig] = lbPublicIP
client := fake.NewClientset(serviceWithIngress())
_, _, err := getPublicIP(submSpec, client, backendConfig, false)
_, _, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).To(HaveOccurred())
})
})
Expand All @@ -118,7 +119,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the IP", func() {
backendConfig[publicIPConfig] = ipv4PublicIP
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(ipv4PublicIP))
Expand All @@ -129,7 +130,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the IP and not an empty value", func() {
backendConfig[publicIPConfig] = ipv4PublicIP
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, true)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, true)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(ipv4PublicIP))
Expand All @@ -140,7 +141,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the IP", func() {
backendConfig[publicIPConfig] = "dns:" + dnsHost
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIPDNS))
Expect(resolver).To(Equal(backendConfig[publicIPConfig]))
Expand All @@ -151,7 +152,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return some IP", func() {
backendConfig[publicIPConfig] = "api:4.icanhazip.com/"
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(net.ParseIP(ip)).NotTo(BeNil())
Expect(resolver).To(Equal(backendConfig[publicIPConfig]))
Expand All @@ -162,7 +163,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the first working one", func() {
backendConfig[publicIPConfig] = ipv4PublicIP + ",dns:" + dnsHost
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(ipv4PublicIP))
Expand All @@ -173,7 +174,7 @@ var _ = Describe("public ip resolvers", func() {
It("should return the first working one", func() {
backendConfig[publicIPConfig] = "dns:thisdomaindoesntexistforsure.badbadbad," + ipv4PublicIP
client := fake.NewClientset()
ip, resolver, err := getPublicIP(submSpec, client, backendConfig, false)
ip, resolver, err := getPublicIP(k8snet.IPv4, submSpec, client, backendConfig, false)
Expect(err).ToNot(HaveOccurred())
Expect(ip).To(Equal(testIP))
Expect(resolver).To(Equal(ipv4PublicIP))
Expand Down
28 changes: 18 additions & 10 deletions pkg/endpoint/public_ip_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,36 @@ func (p *PublicIPWatcher) Run(stopCh <-chan struct{}) {
}

func (p *PublicIPWatcher) syncPublicIP() {
var publicIPs []string
localEndpointSpec := p.config.LocalEndpoint.Spec()

publicIP, resolver, err := getPublicIP(p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false)
if err != nil {
logger.Warningf("Could not determine public IP of the gateway node %q: %v", localEndpointSpec.Hostname, err)
return
for _, family := range p.config.SubmSpec.GetIPFamilies() {
publicIP, _, err := getPublicIP(family, p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false)
if err != nil {
logger.Warningf("Could not determine public IP for family %v of the gateway node %q: %v", family, localEndpointSpec.Hostname, err)
continue
}

if publicIP != localEndpointSpec.GetPublicIP(family) {
publicIPs = append(publicIPs, publicIP)
}
yboaron marked this conversation as resolved.
Show resolved Hide resolved
}

if localEndpointSpec.PublicIP != publicIP {
logger.Infof("Public IP changed for the Gateway, updating the local endpoint with public IP %q obtained from resolver %q",
publicIP, resolver)
if len(publicIPs) > 0 {
logger.Infof("Public IPs changed for the Gateway, updating the local endpoint with public IPs %q", publicIPs)

if err := p.updateLocalEndpoint(publicIP); err != nil {
if err := p.updateLocalEndpoint(publicIPs); err != nil {
logger.Error(err, "Error updating the public IP for local endpoint")
return
}
}
}

func (p *PublicIPWatcher) updateLocalEndpoint(publicIP string) error {
func (p *PublicIPWatcher) updateLocalEndpoint(publicIPs []string) error {
err := p.config.LocalEndpoint.Update(context.TODO(), func(existing *submv1.EndpointSpec) {
existing.PublicIP = publicIP
for _, publicIP := range publicIPs {
existing.SetPublicIP(publicIP)
}
})

return errors.Wrap(err, "error updating the public IP of the local endpoint")
Expand Down
5 changes: 3 additions & 2 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,9 @@ func submarinerClusterFrom(submSpec *types.SubmarinerSpecification) *types.Subma
return &types.SubmarinerCluster{
ID: submSpec.ClusterID,
Spec: subv1.ClusterSpec{
ClusterID: submSpec.ClusterID,
ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes
ClusterID: submSpec.ClusterID,
ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes
// TODO_IPV6: delete cidr.ExtractIPv4Subnets
ServiceCIDR: cidr.ExtractIPv4Subnets(submSpec.ServiceCidr),
ClusterCIDR: cidr.ExtractIPv4Subnets(submSpec.ClusterCidr),
GlobalCIDR: globalCIDR,
Expand Down
Loading
Loading