Skip to content

Commit

Permalink
Gateway - use endpoint set private, public and healthcheck IPs
Browse files Browse the repository at this point in the history
This PR :
- updates gateway code to use endpoint SetHealthCheckIP
,SetPrivateIP and SetPublicIP.
- add placeholders for IPV6 public,private and healthcheck IPs.

Signed-off-by: Yossi Boaron <[email protected]>
  • Loading branch information
yboaron committed Jan 21, 2025
1 parent 2c0e7b5 commit b1326ef
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 25 deletions.
54 changes: 41 additions & 13 deletions pkg/endpoint/local_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8serrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
k8snet "k8s.io/utils/net"
"k8s.io/utils/set"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
Expand Down Expand Up @@ -137,7 +139,7 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification,
airGappedDeployment bool,
) (*submv1.EndpointSpec, error) {
// We'll panic if submSpec is nil, this is intentional
privateIP := GetLocalIP()
privateIPs := GetLocalIPs(submSpec.GetIPFamilies())

gwNode, err := node.GetLocalNode(ctx, k8sClient)
if err != nil {
Expand All @@ -157,6 +159,7 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification,
localSubnets = submSpec.GlobalCidr
globalnetEnabled = true
} else {
// TODO_IPV6: set localSubnets to submSpec.ServiceCidr + submSpec.ClusterCidr
localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ServiceCidr)...)
localSubnets = append(localSubnets, cidr.ExtractIPv4Subnets(submSpec.ClusterCidr)...)
}
Expand All @@ -171,42 +174,67 @@ func GetLocalSpec(ctx context.Context, submSpec *types.SubmarinerSpecification,
}

endpointSpec := &submv1.EndpointSpec{
CableName: fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID, strings.ReplaceAll(privateIP, ".", "-")),
ClusterID: submSpec.ClusterID,
Hostname: hostname,
PrivateIP: privateIP,
NATEnabled: submSpec.NATEnabled,
Subnets: localSubnets,
Backend: submSpec.CableDriver,
BackendConfig: backendConfig,
}

publicIP, resolver, err := getPublicIP(submSpec, k8sClient, backendConfig, airGappedDeployment)
if err != nil {
return nil, errors.Wrap(err, "could not determine public IP")
for _, privateIP := range privateIPs {
endpointSpec.SetPrivateIP(privateIP)
}

logger.Infof("Obtained local endpoint public IP %q using resolver %q", publicIP, resolver)
endpointSpec.CableName = fmt.Sprintf("submariner-cable-%s-%s", submSpec.ClusterID,
strings.ReplaceAll(endpointSpec.GetPrivateIP(k8snet.IPv4), ".", "-"))
publicIPs, err := getPublicIPs(submSpec, k8sClient, backendConfig, airGappedDeployment)

if len(publicIPs) == 0 {
return nil, errors.Wrap(err, "could not determine public IP")
}

endpointSpec.PublicIP = publicIP
for _, publicIP := range publicIPs {
endpointSpec.SetPublicIP(publicIP)
}

if submSpec.HealthCheckEnabled && !globalnetEnabled {
// When globalnet is enabled, HealthCheckIP will be the globalIP assigned to the Active GatewayNode.
// In a fresh deployment, globalIP annotation for the node might take few seconds. So we listen on NodeEvents
// and update the endpoint HealthCheckIP (to globalIP) in datastoreSyncer at a later stage. This will trigger
// the HealthCheck between the clusters.
cniIface, err := cni.Discover(submSpec.ClusterCidr)
if err != nil {
return nil, fmt.Errorf("error getting CNI Interface IP address: %w."+
"Please disable the health check if your CNI does not expose a pod IP on the nodes", err)
healthcheckIPs, err := getHealthCheckIPs(submSpec)
if len(healthcheckIPs) == 0 {
return nil, errors.Wrap(err, "could not determine healthcheck IP")
}

endpointSpec.HealthCheckIP = cniIface.IPAddress
for _, healthcheckIP := range healthcheckIPs {
endpointSpec.SetHealthCheckIP(healthcheckIP)
}
}

return endpointSpec, nil
}

func getHealthCheckIPs(submSpec *types.SubmarinerSpecification) ([]string, error) {
var healthCheckIPs []string
var errs []error

for _, family := range submSpec.GetIPFamilies() {
if family == k8snet.IPv4 {
cniIface, err := cni.Discover(submSpec.ClusterCidr)
if err != nil {
errs = append(errs, fmt.Errorf("error getting CNI Interface IP address: %w."+
"Please disable the health check if your CNI does not expose a pod IP on the nodes", err))
} else {
healthCheckIPs = append(healthCheckIPs, cniIface.IPAddress)
}
} // TODO_IPV6: add V6 healthcheck IP
}

return healthCheckIPs, errors.Wrap(k8serrors.NewAggregate(errs), "getHealthCheckIPs failed")
}

func getBackendConfig(nodeObj *v1.Node) (map[string]string, error) {
backendConfig, err := getNodeBackendConfig(nodeObj)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/endpoint/local_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package endpoint

import (
"net"

k8snet "k8s.io/utils/net"
)

func GetLocalIPForDestination(dst string) string {
Expand All @@ -37,3 +39,15 @@ func GetLocalIPForDestination(dst string) string {
func GetLocalIP() string {
return GetLocalIPForDestination("8.8.8.8")
}

func GetLocalIPs(ipFamilies [2]k8snet.IPFamily) []string {
var addresses []string

for _, family := range ipFamilies {
if family == k8snet.IPv4 {
addresses = append(addresses, GetLocalIP())
} // TODO_IPV6: add GetLocalIPV6
}

return addresses
}
22 changes: 22 additions & 0 deletions pkg/endpoint/public_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
k8snet "k8s.io/utils/net"
)

type publicIPResolverFunction func(clientset kubernetes.Interface, namespace, value string) (string, error)
Expand All @@ -63,6 +64,27 @@ func getPublicIPResolvers() string {
return strings.Join(serverList, ",")
}

func getPublicIPs(submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface,
backendConfig map[string]string, airGapped bool,
) ([]string, error) {
var publicIPs []string
var errs []error

for _, family := range submSpec.GetIPFamilies() {
if family == k8snet.IPv4 {
publicIP, resolver, err := getPublicIP(submSpec, k8sClient, backendConfig, airGapped)
if err != nil {
errs = append(errs, errors.Wrap(err, "could not determine public IP"))
} else {
logger.Infof("Obtained local endpoint public IP %q using resolver %q", publicIP, resolver)
publicIPs = append(publicIPs, publicIP)
}
} // TODO_IPV6: add getPublicIPV6
}

return publicIPs, errors.Wrap(k8serrors.NewAggregate(errs), "getPublicIPs failed")
}

func getPublicIP(submSpec *types.SubmarinerSpecification, k8sClient kubernetes.Interface,
backendConfig map[string]string, airGapped bool,
) (string, string, error) {
Expand Down
37 changes: 27 additions & 10 deletions pkg/endpoint/public_ip_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/submariner-io/submariner/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
k8snet "k8s.io/utils/net"
)

type PublicIPWatcherConfig struct {
Expand Down Expand Up @@ -63,28 +64,44 @@ func (p *PublicIPWatcher) Run(stopCh <-chan struct{}) {
}

func (p *PublicIPWatcher) syncPublicIP() {
var publicIPs []string
localEndpointSpec := p.config.LocalEndpoint.Spec()

publicIP, resolver, err := getPublicIP(p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false)
if err != nil {
logger.Warningf("Could not determine public IP of the gateway node %q: %v", localEndpointSpec.Hostname, err)
return
publicIPsChanged := false

for _, family := range p.config.SubmSpec.GetIPFamilies() {
if family == k8snet.IPv4 {
publicIP, resolver, err := getPublicIP(p.config.SubmSpec, p.config.K8sClient, localEndpointSpec.BackendConfig, false)
if err != nil {
logger.Warningf("Could not determine public IPV4 of the gateway node %q: %v", localEndpointSpec.Hostname, err)
} else {
if publicIP != localEndpointSpec.GetPublicIP(k8snet.IPv4) {
publicIPsChanged = true

logger.Infof("Public IPv4 changed for the Gateway, updating the local endpoint with public IP %q obtained from resolver %q",
publicIP, resolver)
}

publicIPs = append(publicIPs, publicIP)
}
} // TODO_IPV6: add getPublicIPV6
}

if localEndpointSpec.PublicIP != publicIP {
logger.Infof("Public IP changed for the Gateway, updating the local endpoint with public IP %q obtained from resolver %q",
publicIP, resolver)
if publicIPsChanged {
logger.Infof("Public IPs changed for the Gateway, updating the local endpoint with public IPs %q", publicIPs)

if err := p.updateLocalEndpoint(publicIP); err != nil {
if err := p.updateLocalEndpoint(publicIPs); err != nil {
logger.Error(err, "Error updating the public IP for local endpoint")
return
}
}
}

func (p *PublicIPWatcher) updateLocalEndpoint(publicIP string) error {
func (p *PublicIPWatcher) updateLocalEndpoint(publicIPs []string) error {
err := p.config.LocalEndpoint.Update(context.TODO(), func(existing *submv1.EndpointSpec) {
existing.PublicIP = publicIP
for _, publicIP := range publicIPs {
existing.SetPublicIP(publicIP)
}
})

return errors.Wrap(err, "error updating the public IP of the local endpoint")
Expand Down
5 changes: 3 additions & 2 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,9 @@ func submarinerClusterFrom(submSpec *types.SubmarinerSpecification) *types.Subma
return &types.SubmarinerCluster{
ID: submSpec.ClusterID,
Spec: subv1.ClusterSpec{
ClusterID: submSpec.ClusterID,
ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes
ClusterID: submSpec.ClusterID,
ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes
// TODO_IPV6: delete cidr.ExtractIPv4Subnets
ServiceCIDR: cidr.ExtractIPv4Subnets(submSpec.ServiceCidr),
ClusterCIDR: cidr.ExtractIPv4Subnets(submSpec.ClusterCidr),
GlobalCIDR: globalCIDR,
Expand Down
9 changes: 9 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package types

import (
subv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
k8snet "k8s.io/utils/net"
)

type SubmarinerCluster struct {
Expand Down Expand Up @@ -50,3 +51,11 @@ type SubmarinerSpecification struct {
HealthCheckMaxPacketLossCount int
MetricsPort int `default:"32780"`
}

func (subSpec *SubmarinerSpecification) GetIPFamilies() [2]k8snet.IPFamily {
var ipFamilies [2]k8snet.IPFamily
// TODO_IPV6: set ipFamilies according to ClusterCidr content
ipFamilies[0] = k8snet.IPv4

return ipFamilies
}

0 comments on commit b1326ef

Please sign in to comment.