Skip to content

Commit

Permalink
fix(proxier): allocate IPs on ProxyConnection Start(), free on Close() (
Browse files Browse the repository at this point in the history
#12)

* fix(proxier): allocate IPs on ProxyConnection Start(), free on Close()

* fix: allocate IP before creating pf, fix deallocation on service removal
  • Loading branch information
jaredallard authored Sep 24, 2020
1 parent 4f4b57a commit 314873f
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 67 deletions.
76 changes: 14 additions & 62 deletions internal/proxier/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/txn2/txeh"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -104,29 +103,18 @@ func NewProxier(k kubernetes.Interface, kconf *rest.Config, l logrus.FieldLogger
}
}

// serviceAddresses returns all of the valid addresses
// for a given kubernetes service
func serviceAddresses(s *corev1.Service) []string {
return []string{
s.Name,
fmt.Sprintf("%s.%s", s.Name, s.Namespace),
fmt.Sprintf("%s.%s.svc", s.Name, s.Namespace),
fmt.Sprintf("%s.%s.svc.cluster", s.Name, s.Namespace),
fmt.Sprintf("%s.%s.svc.cluster.local", s.Name, s.Namespace),
}
}

func (p *Proxier) handleInformerEvent(ctx context.Context, event string, obj interface{}) { //nolint:funlen,gocyclo
k, _ := cache.MetaNamespaceKeyFunc(obj)

item := ""
switch obj.(type) {
switch tobj := obj.(type) {
case *corev1.Pod:
item = "pod"
case *corev1.Service:
item = "service"
case *ProxyConnection:
item = "connection"
k = tobj.Service.GetKey()
default:
// skip unknown types
p.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -172,7 +160,9 @@ func (p *Proxier) handleInformerEvent(ctx context.Context, event string, obj int
return
}

pc.Close()
if err := pc.Close(); err != nil {
p.log.WithField("service", k).WithError(err).Debug("failed to close proxy connection")
}

s := pc.Service
serviceKey := s.GetKey()
Expand Down Expand Up @@ -209,16 +199,13 @@ func (p *Proxier) handleInformerEvent(ctx context.Context, event string, obj int
}

// close the underlying port-forward
pc.Close()
if err := pc.Close(); err != nil {
p.log.WithField("service", k).WithError(err).Debug("failed to close proxy connection")
}

// reset the activeServices section for this service
p.activeServices[k] = nil

p.hosts.RemoveAddresses(serviceAddresses(obj.(*corev1.Service)))
if err := p.hosts.Save(); err != nil {
p.log.Warnf("failed to clean hosts file: %v", err)
}

p.log.Warnf("tunnel for %s has been destroyed due to the underlying service being deleted", k)
}
}
Expand Down Expand Up @@ -303,8 +290,6 @@ func (p *Proxier) allocateIP(serviceKey string) (*ipam.IP, error) {
// We only need to create alias on darwin, on other platforms
// lo0 becomes lo and routes the full /8
if runtime.GOOS == "darwin" {
// TODO(jaredallard): This logic should be moved into the
// actual proxy connection
args := []string{"lo0", "alias", ipAddress.IP.String(), "up"}
if err := exec.Command("ifconfig", args...).Run(); err != nil {
return nil, errors.Wrap(err, "failed to create ip link")
Expand Down Expand Up @@ -351,15 +336,9 @@ func (p *Proxier) createProxy(ctx context.Context, s *Service) error { //nolint:

p.log.Infof("creating tunnel for service %s", serviceKey)

ipAddress, err := p.allocateIP(serviceKey)
if err != nil {
return errors.Wrap(err, "failed to allocate IP")
}

p.connMutex.Lock()
p.activeServices[serviceKey] = &ProxyConnection{
proxier: p,
IP: ipAddress,
Ports: ports,
Service: *s,
Pod: *pod,
Expand All @@ -375,13 +354,6 @@ func (p *Proxier) createProxy(ctx context.Context, s *Service) error { //nolint:
)
}

// only add addresses for services we actually are routing to
p.log.Debugf("adding hosts file entry for service '%s'", serviceKey)
p.hosts.AddHosts(ipAddress.IP.String(), serviceAddresses(kserv))
if err := p.hosts.Save(); err != nil {
return errors.Wrap(err, "failed to save address to hosts")
}

return nil
}

Expand Down Expand Up @@ -423,34 +395,14 @@ func (p *Proxier) Proxy(ctx context.Context) error {

p.log.Info("cleaning up ...")

for k := range p.activeServices {
namespace, name, err := cache.SplitMetaNamespaceKey(k)
if err != nil {
// TODO: handle this
continue
}

// cleanup the DNS entries
kserv := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}}
addrs := serviceAddresses(kserv)

p.log.WithField("addresses", addrs).Debug("cleaning up hosts entry")
p.hosts.RemoveHosts(addrs)
if err := p.hosts.Save(); err != nil {
p.log.Warnf("failed to clean hosts file: %v", err)
}
}

// if we're on a platform that needed ip aliasing, cleanup
// all of the ip aliases
if runtime.GOOS == "darwin" {
for _, ip := range p.serviceIPs {
args := []string{"lo0", "-alias", ip.IP.String()}
if err := exec.Command("ifconfig", args...).Run(); err != nil {
return errors.Wrapf(err, "failed to remove ip alias '%s'", ip.IP.String())
}
p.connMutex.Lock()
for k, pc := range p.activeServices {
if err := pc.Close(); err != nil {
p.log.WithField("service", k).WithError(err).Debug("failed to close proxy connection")
}
p.activeServices[k] = nil
}
p.connMutex.Unlock()

p.log.Info("cleaned up")

Expand Down
64 changes: 59 additions & 5 deletions internal/proxier/proxy_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package proxier

import (
"context"
"fmt"
"os/exec"
"runtime"

"github.com/jaredallard/localizer/internal/kube"
"github.com/metal-stack/go-ipam"
Expand Down Expand Up @@ -43,14 +46,41 @@ type ProxyConnection struct {
Pod corev1.Pod
}

// GetAddresses returns all of the valid addresses
// for a given kubernetes service
func (pc *ProxyConnection) GetAddresses() []string {
s := pc.Service
return []string{
s.Name,
fmt.Sprintf("%s.%s", s.Name, s.Namespace),
fmt.Sprintf("%s.%s.svc", s.Name, s.Namespace),
fmt.Sprintf("%s.%s.svc.cluster", s.Name, s.Namespace),
fmt.Sprintf("%s.%s.svc.cluster.local", s.Name, s.Namespace),
}
}

// Start starts a proxy connection
func (pc *ProxyConnection) Start(ctx context.Context) error {
serviceKey := pc.Service.GetKey()
ipAddress, err := pc.proxier.allocateIP(serviceKey)
if err != nil {
return errors.Wrap(err, "failed to allocate IP")
}
pc.IP = ipAddress

fw, err := kube.CreatePortForward(ctx, pc.proxier.rest, pc.proxier.kconf, &pc.Pod, pc.IP.IP.String(), pc.Ports)
if err != nil {
return errors.Wrap(err, "failed to create tunnel")
}
pc.fw = fw

// only add addresses for services we actually are routing to
pc.proxier.log.Debugf("adding hosts file entry for service '%s'", serviceKey)
pc.proxier.hosts.AddHosts(pc.IP.IP.String(), pc.GetAddresses())
if err := pc.proxier.hosts.Save(); err != nil {
return errors.Wrap(err, "failed to save address to hosts")
}

go func() {
// TODO(jaredallard): Figure out a way to better backoff errors here
if err := fw.ForwardPorts(); err != nil {
Expand All @@ -76,20 +106,44 @@ func (pc *ProxyConnection) Start(ctx context.Context) error {
// Close closes the current proxy connection and marks it as
// no longer being active
func (pc *ProxyConnection) Close() error {
// if it's nil then it's already been cleaned up
if pc == nil {
return nil
}

// note: If the parent context was canceled
// this has already been closed
if pc.fw != nil {
pc.fw.Close()
}

// if we had an ip address, free it
// cleanup the DNS entries for this ProxyConnection
pc.proxier.hosts.RemoveAddresses(pc.GetAddresses())
if err := pc.proxier.hosts.Save(); err != nil {
return errors.Wrap(err, "failed to remove hosts entry")
}

// if we have an ip we should release it
if pc.IP != nil {
_, err := pc.proxier.ipam.ReleaseIP(pc.IP)
if err != nil {
return errors.Wrap(err, "failed to free IP address")
// If we are on a platform that needs aliases
// then we need to remove it
if runtime.GOOS == "darwin" {
ipStr := pc.IP.IP.String()
args := []string{"lo0", "-alias", ipStr}
if err := exec.Command("ifconfig", args...).Run(); err != nil {
return errors.Wrapf(err, "failed to remove ip alias '%s'", ipStr)
}
}

// release the IP after cleanup, in case it can't be released
if _, err := pc.proxier.ipam.ReleaseIP(pc.IP); err != nil {
return errors.Wrap(err, "failed to release IP address")
}

pc.proxier.ipMutex.Lock()
defer pc.proxier.ipMutex.Unlock()
pc.proxier.serviceIPs[pc.Service.GetKey()] = nil
}

// we'll return an error one day
return nil
}

0 comments on commit 314873f

Please sign in to comment.