From 314873f0836700ab00347c062db868b4d4f8dbd1 Mon Sep 17 00:00:00 2001 From: Jared Allard Date: Thu, 24 Sep 2020 12:42:49 -0700 Subject: [PATCH] fix(proxier): allocate IPs on ProxyConnection Start(), free on Close() (#12) * fix(proxier): allocate IPs on ProxyConnection Start(), free on Close() * fix: allocate IP before creating pf, fix deallocation on service removal --- internal/proxier/proxy.go | 76 +++++----------------------- internal/proxier/proxy_connection.go | 64 +++++++++++++++++++++-- 2 files changed, 73 insertions(+), 67 deletions(-) diff --git a/internal/proxier/proxy.go b/internal/proxier/proxy.go index a3fdee4..3781b22 100644 --- a/internal/proxier/proxy.go +++ b/internal/proxier/proxy.go @@ -29,7 +29,6 @@ import ( "github.com/sirupsen/logrus" "github.com/txn2/txeh" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" @@ -104,29 +103,18 @@ func NewProxier(k kubernetes.Interface, kconf *rest.Config, l logrus.FieldLogger } } -// serviceAddresses returns all of the valid addresses -// for a given kubernetes service -func serviceAddresses(s *corev1.Service) []string { - return []string{ - s.Name, - fmt.Sprintf("%s.%s", s.Name, s.Namespace), - fmt.Sprintf("%s.%s.svc", s.Name, s.Namespace), - fmt.Sprintf("%s.%s.svc.cluster", s.Name, s.Namespace), - fmt.Sprintf("%s.%s.svc.cluster.local", s.Name, s.Namespace), - } -} - func (p *Proxier) handleInformerEvent(ctx context.Context, event string, obj interface{}) { //nolint:funlen,gocyclo k, _ := cache.MetaNamespaceKeyFunc(obj) item := "" - switch obj.(type) { + switch tobj := obj.(type) { case *corev1.Pod: item = "pod" case *corev1.Service: item = "service" case *ProxyConnection: item = "connection" + k = tobj.Service.GetKey() default: // skip unknown types p.log.WithFields(logrus.Fields{ @@ -172,7 +160,9 @@ func (p *Proxier) handleInformerEvent(ctx context.Context, event string, obj int return } - pc.Close() + if err := pc.Close(); err != nil { + p.log.WithField("service", k).WithError(err).Debug("failed to close proxy connection") + } s := pc.Service serviceKey := s.GetKey() @@ -209,16 +199,13 @@ func (p *Proxier) handleInformerEvent(ctx context.Context, event string, obj int } // close the underlying port-forward - pc.Close() + if err := pc.Close(); err != nil { + p.log.WithField("service", k).WithError(err).Debug("failed to close proxy connection") + } // reset the activeServices section for this service p.activeServices[k] = nil - p.hosts.RemoveAddresses(serviceAddresses(obj.(*corev1.Service))) - if err := p.hosts.Save(); err != nil { - p.log.Warnf("failed to clean hosts file: %v", err) - } - p.log.Warnf("tunnel for %s has been destroyed due to the underlying service being deleted", k) } } @@ -303,8 +290,6 @@ func (p *Proxier) allocateIP(serviceKey string) (*ipam.IP, error) { // We only need to create alias on darwin, on other platforms // lo0 becomes lo and routes the full /8 if runtime.GOOS == "darwin" { - // TODO(jaredallard): This logic should be moved into the - // actual proxy connection args := []string{"lo0", "alias", ipAddress.IP.String(), "up"} if err := exec.Command("ifconfig", args...).Run(); err != nil { return nil, errors.Wrap(err, "failed to create ip link") @@ -351,15 +336,9 @@ func (p *Proxier) createProxy(ctx context.Context, s *Service) error { //nolint: p.log.Infof("creating tunnel for service %s", serviceKey) - ipAddress, err := p.allocateIP(serviceKey) - if err != nil { - return errors.Wrap(err, "failed to allocate IP") - } - p.connMutex.Lock() p.activeServices[serviceKey] = &ProxyConnection{ proxier: p, - IP: ipAddress, Ports: ports, Service: *s, Pod: *pod, @@ -375,13 +354,6 @@ func (p *Proxier) createProxy(ctx context.Context, s *Service) error { //nolint: ) } - // only add addresses for services we actually are routing to - p.log.Debugf("adding hosts file entry for service '%s'", serviceKey) - p.hosts.AddHosts(ipAddress.IP.String(), serviceAddresses(kserv)) - if err := p.hosts.Save(); err != nil { - return errors.Wrap(err, "failed to save address to hosts") - } - return nil } @@ -423,34 +395,14 @@ func (p *Proxier) Proxy(ctx context.Context) error { p.log.Info("cleaning up ...") - for k := range p.activeServices { - namespace, name, err := cache.SplitMetaNamespaceKey(k) - if err != nil { - // TODO: handle this - continue - } - - // cleanup the DNS entries - kserv := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}} - addrs := serviceAddresses(kserv) - - p.log.WithField("addresses", addrs).Debug("cleaning up hosts entry") - p.hosts.RemoveHosts(addrs) - if err := p.hosts.Save(); err != nil { - p.log.Warnf("failed to clean hosts file: %v", err) - } - } - - // if we're on a platform that needed ip aliasing, cleanup - // all of the ip aliases - if runtime.GOOS == "darwin" { - for _, ip := range p.serviceIPs { - args := []string{"lo0", "-alias", ip.IP.String()} - if err := exec.Command("ifconfig", args...).Run(); err != nil { - return errors.Wrapf(err, "failed to remove ip alias '%s'", ip.IP.String()) - } + p.connMutex.Lock() + for k, pc := range p.activeServices { + if err := pc.Close(); err != nil { + p.log.WithField("service", k).WithError(err).Debug("failed to close proxy connection") } + p.activeServices[k] = nil } + p.connMutex.Unlock() p.log.Info("cleaned up") diff --git a/internal/proxier/proxy_connection.go b/internal/proxier/proxy_connection.go index 778de96..7b48be1 100644 --- a/internal/proxier/proxy_connection.go +++ b/internal/proxier/proxy_connection.go @@ -15,6 +15,9 @@ package proxier import ( "context" + "fmt" + "os/exec" + "runtime" "github.com/jaredallard/localizer/internal/kube" "github.com/metal-stack/go-ipam" @@ -43,14 +46,41 @@ type ProxyConnection struct { Pod corev1.Pod } +// GetAddresses returns all of the valid addresses +// for a given kubernetes service +func (pc *ProxyConnection) GetAddresses() []string { + s := pc.Service + return []string{ + s.Name, + fmt.Sprintf("%s.%s", s.Name, s.Namespace), + fmt.Sprintf("%s.%s.svc", s.Name, s.Namespace), + fmt.Sprintf("%s.%s.svc.cluster", s.Name, s.Namespace), + fmt.Sprintf("%s.%s.svc.cluster.local", s.Name, s.Namespace), + } +} + // Start starts a proxy connection func (pc *ProxyConnection) Start(ctx context.Context) error { + serviceKey := pc.Service.GetKey() + ipAddress, err := pc.proxier.allocateIP(serviceKey) + if err != nil { + return errors.Wrap(err, "failed to allocate IP") + } + pc.IP = ipAddress + fw, err := kube.CreatePortForward(ctx, pc.proxier.rest, pc.proxier.kconf, &pc.Pod, pc.IP.IP.String(), pc.Ports) if err != nil { return errors.Wrap(err, "failed to create tunnel") } pc.fw = fw + // only add addresses for services we actually are routing to + pc.proxier.log.Debugf("adding hosts file entry for service '%s'", serviceKey) + pc.proxier.hosts.AddHosts(pc.IP.IP.String(), pc.GetAddresses()) + if err := pc.proxier.hosts.Save(); err != nil { + return errors.Wrap(err, "failed to save address to hosts") + } + go func() { // TODO(jaredallard): Figure out a way to better backoff errors here if err := fw.ForwardPorts(); err != nil { @@ -76,20 +106,44 @@ func (pc *ProxyConnection) Start(ctx context.Context) error { // Close closes the current proxy connection and marks it as // no longer being active func (pc *ProxyConnection) Close() error { + // if it's nil then it's already been cleaned up + if pc == nil { + return nil + } + // note: If the parent context was canceled // this has already been closed if pc.fw != nil { pc.fw.Close() } - // if we had an ip address, free it + // cleanup the DNS entries for this ProxyConnection + pc.proxier.hosts.RemoveAddresses(pc.GetAddresses()) + if err := pc.proxier.hosts.Save(); err != nil { + return errors.Wrap(err, "failed to remove hosts entry") + } + + // if we have an ip we should release it if pc.IP != nil { - _, err := pc.proxier.ipam.ReleaseIP(pc.IP) - if err != nil { - return errors.Wrap(err, "failed to free IP address") + // If we are on a platform that needs aliases + // then we need to remove it + if runtime.GOOS == "darwin" { + ipStr := pc.IP.IP.String() + args := []string{"lo0", "-alias", ipStr} + if err := exec.Command("ifconfig", args...).Run(); err != nil { + return errors.Wrapf(err, "failed to remove ip alias '%s'", ipStr) + } } + + // release the IP after cleanup, in case it can't be released + if _, err := pc.proxier.ipam.ReleaseIP(pc.IP); err != nil { + return errors.Wrap(err, "failed to release IP address") + } + + pc.proxier.ipMutex.Lock() + defer pc.proxier.ipMutex.Unlock() + pc.proxier.serviceIPs[pc.Service.GetKey()] = nil } - // we'll return an error one day return nil }