diff --git a/README.md b/README.md index bb380f0..6520b06 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,11 @@ A no-frills local development approach for Kubernetes powered Developer Environm ## Why another CLI tool? -Tools such as; Telepresence, Skaffold, and Tilt all attempt to solve the problem of getting users +Tools such as; Telepresence, Skaffold, and others all attempt to solve the problem of getting users used to using Kubernetes. This is a pretty big task given that Kubernetes has a gigantic surface area. From my experience (**keyword**: _my experience_), developers have no interest in what -platform they are deploying to. All they care about is it's easy to do and that local development is -not complicated or different from what they are used to. I, also, firmly belive that the best dev-tool is +platform they are deploying to. All they care about is it's easy to work with and that local development is +not complicated or different from what they are used to. I, also, firmly believe that the best dev-tool is a tool that requires no configuration, and is self-explanatory. Which these tools tend... to not be. Given the above, localizer attempts to solve this problem with a few rules: @@ -22,7 +22,7 @@ they were running "locally" (**Note**: Only on Linux do containers _actually_ ru Given the context of why this was created, and the constraints listed therein, localizer solves these issues by emulating services running locally. It, essentially, can be considered a fancy wrapper around `kubectl port-forward` -in it's basic operation. Services in a cluster are automatically discovered and port-forwards are created. While running +in it's basic operation. Services in a cluster are automatically discovered and tunnels are created. While running an entry will be added to your local dns resolver that will also allow the service to be looked up by its Kubernetes service name. This allows a thin emulation layer of Kubernetes, without the caveats of a real Kubernetes cluster. @@ -38,25 +38,18 @@ When running `localizer expose ` your local machine will look for a Kubernetes cluster, and if it exists it will create a container that will proxy traffic sent to it to your local machine allowing remote resources to access your local machine as if they were also running locally. - -## How do run `localizer`? +## How do I run `localizer`? Easy, just download a release from [Github Releases](/releases) and run the following: ``` -$ localizer +$ sudo localizer ``` This will attempt to proxy all services in Kubernetes to your local machine under their respective ports. ## FAQ -### Help! I have a port-collision, what do I do? - -The downside to local development is this happens :( However, we have a way to "change" the port that is exposed locally. -Simply add a `localizer.jaredallard.github.com/remap-servicePortName: "localPort"` annotation to the service, and that port -will be mapped to `localPort` instead of `servicePort` when `localizer` is run - ### Does `localizer` support Windows? WSL2 should work, and I'd consider it supported. I wrote most of this on WSL2, but I will likely maintain it on `macOS`. diff --git a/cmd/localizer/localizer.go b/cmd/localizer/localizer.go index 3653b37..c792ea9 100644 --- a/cmd/localizer/localizer.go +++ b/cmd/localizer/localizer.go @@ -211,7 +211,7 @@ func main() { //nolint:funlen,gocyclo p, err := e.Expose(ctx, mappedServicePorts, serviceSplit[0], serviceSplit[1]) if err != nil { - return errors.Wrap(err, "failed to create reverse port-forward") + return errors.Wrap(err, "failed to create reverse tunnel") } return p.Start(ctx) diff --git a/internal/expose/port.go b/internal/expose/port.go index 22775a4..d6b3700 100644 --- a/internal/expose/port.go +++ b/internal/expose/port.go @@ -58,7 +58,7 @@ func (s *scaledObjectType) GetKey() string { } func (p *ServiceForward) createServerPortForward(ctx context.Context, po *corev1.Pod) (*portforward.PortForwarder, error) { - return kube.CreatePortForward(ctx, p.c.k.CoreV1().RESTClient(), p.c.kconf, po, "0.0.0.0", "50:50") + return kube.CreatePortForward(ctx, p.c.k.CoreV1().RESTClient(), p.c.kconf, po, "0.0.0.0", []string{"50:50"}) } func (p *ServiceForward) createServerPodAndTransport(ctx context.Context) (func(), error) { //nolint:funlen,gocyclo @@ -158,11 +158,11 @@ loop: } } - p.c.log.Info("pod is ready, creating port-forward(s)") + p.c.log.Info("pod is ready, creating tunnel") fw, err := p.createServerPortForward(ctx, po) if err != nil { - return func() {}, errors.Wrap(err, "failed to create port-forward for underlying transport") + return func() {}, errors.Wrap(err, "failed to create tunnel for underlying transport") } fw.Ready = make(chan struct{}) diff --git a/internal/kube/client.go b/internal/kube/client.go index e148cf5..5e5b204 100644 --- a/internal/kube/client.go +++ b/internal/kube/client.go @@ -69,7 +69,7 @@ func GetKubeClient(contextName string) (*rest.Config, kubernetes.Interface, erro } func CreatePortForward(ctx context.Context, r rest.Interface, rc *rest.Config, - p *corev1.Pod, ip, port string) (*portforward.PortForwarder, error) { + p *corev1.Pod, ip string, ports []string) (*portforward.PortForwarder, error) { req := r.Post(). Resource("pods"). Namespace(p.Namespace). @@ -82,7 +82,7 @@ func CreatePortForward(ctx context.Context, r rest.Interface, rc *rest.Config, } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL()) - return portforward.NewOnAddresses(dialer, []string{ip}, []string{port}, ctx.Done(), nil, ioutil.Discard, ioutil.Discard) + return portforward.NewOnAddresses(dialer, []string{ip}, ports, ctx.Done(), nil, ioutil.Discard, ioutil.Discard) } type ResolvedServicePort struct { diff --git a/internal/proxier/proxy.go b/internal/proxier/proxy.go index 82fcb5b..7f9c3af 100644 --- a/internal/proxier/proxy.go +++ b/internal/proxier/proxy.go @@ -60,10 +60,12 @@ type Proxier struct { // active{,services,Pods} are mapping indexes for // ProxyConnections - connMutex sync.Mutex - serviceIPs map[string]*ipam.IP - activeServices map[string][]*ProxyConnection - activePods map[string][]*ProxyConnection + connMutex sync.Mutex + ipMutex sync.Mutex + serviceIPs map[string]*ipam.IP + + activeServices map[string]*ProxyConnection + activePods map[string]*ProxyConnection } // NewProxier creates a new proxier instance @@ -96,8 +98,8 @@ func NewProxier(k kubernetes.Interface, kconf *rest.Config, l logrus.FieldLogger ipamPrefix: prefix, serviceIPs: make(map[string]*ipam.IP), - activePods: make(map[string][]*ProxyConnection), - activeServices: make(map[string][]*ProxyConnection), + activePods: make(map[string]*ProxyConnection), + activeServices: make(map[string]*ProxyConnection), } } @@ -158,68 +160,63 @@ func (p *Proxier) handleInformerEvent(ctx context.Context, event string, obj int // so, we mimic the pod dead event pc := obj.(*ProxyConnection) k, _ = cache.MetaNamespaceKeyFunc(&pc.Pod) - p.log.Infof("underlying connection died for %d (-> %s:%d)", pc.LocalPort, k, pc.RemotePort) + p.log.Warnf("underlying connection died for %s", k) } - refreshServices := make([]Service, len(p.activePods[k])) - refreshPorts := make([]string, len(p.activePods[k])) - + // skip pods we don't know about p.connMutex.Lock() - for i, pc := range p.activePods[k] { - refreshServices[i] = pc.Service - refreshPorts[i] = pc.GetPort() - pc.Close() + pc := p.activePods[k] + if pc == nil { + p.connMutex.Unlock() + return } + pc.Close() + + s := pc.Service + // reset the activePods p.activePods[k] = nil p.connMutex.Unlock() - if len(refreshPorts) > 0 { - p.log.WithField("ports", refreshPorts). - Warnf("port-forward for %s is being refreshed due to underlying pod being destroyed", k) - } - - for _, s := range refreshServices { - ticker := backoff.NewTicker(backoff.NewExponentialBackOff()) - for { - select { - case <-ticker.C: - if err := p.createProxy(ctx, &s); err != nil { //nolint:scopelint - p.log.Warnf("failed to refresh port-forward for %s: %v (trying again)", k, err) - } - ticker.Stop() - p.log.WithField("ports", refreshPorts). - Infof("refreshed port-forward(s) for '%s'", k) - return - case <-ctx.Done(): - return + p.log.Warnf("tunnel for %s is being refreshed due to underlying pod being destroyed", k) + ticker := backoff.NewTicker(backoff.NewExponentialBackOff()) + for { + select { + case <-ticker.C: + if err := p.createProxy(ctx, &s); err != nil { //nolint:scopelint + p.log.WithError(err).Warnf("failed to refresh tunnel for %s (trying again)", k) } + ticker.Stop() + p.log.Infof("refreshed tunnel for '%s'", k) + return + case <-ctx.Done(): + return } } case "service": - removedPorts := make([]string, len(p.activeServices[k])) - p.connMutex.Lock() - for i, pc := range p.activeServices[k] { - removedPorts[i] = pc.GetPort() - pc.Close() + defer p.connMutex.Unlock() + + // ignore services we don't know anything abou + pc := p.activeServices[k] + if pc == nil { + return } + // close the underlying port-forward + pc.Close() + // reset the activeServices section for this service p.activeServices[k] = nil - p.connMutex.Unlock() p.hosts.RemoveAddresses(serviceAddresses(obj.(*corev1.Service))) if err := p.hosts.Save(); err != nil { p.log.Warnf("failed to clean hosts file: %v", err) } - if len(removedPorts) > 0 { - p.log.WithField("ports", removedPorts). - Warnf("port-forward for %s has been destroyed due to the underlying service being deleted", k) - } + p.log.Warnf("tunnel for %s has been destroyed due to the underlying service being deleted", k) } } @@ -287,6 +284,33 @@ func (p *Proxier) findPodBySelector(o runtime.Object) (*corev1.Pod, error) { return pod, err } +// allocateIP allocates an ip for a given service key +func (p *Proxier) allocateIP(serviceKey string) (*ipam.IP, error) { + p.ipMutex.Lock() + defer p.ipMutex.Unlock() + + ipAddress := p.serviceIPs[serviceKey] + if ipAddress == nil { + var err error + ipAddress, err = p.ipam.AcquireIP(p.ipamPrefix.Cidr) + if err != nil { + return nil, err + } + + // TODO(jaredallard): This logic should be moved into the + // actual proxy connection + args := []string{"lo0", "alias", ipAddress.IP.String(), "up"} + if err := exec.Command("ifconfig", args...).Run(); err != nil { + return nil, errors.Wrap(err, "failed to create ip link") + } + + p.log.WithField("service", serviceKey).Debugf("allocated ip address %s", ipAddress.IP) + p.serviceIPs[serviceKey] = ipAddress + } + + return p.serviceIPs[serviceKey], nil +} + // createProxy creates a proxy connection func (p *Proxier) createProxy(ctx context.Context, s *Service) error { //nolint:funlen,gocyclo item, exists, err := p.servStore.GetByKey(fmt.Sprintf("%s/%s", s.Namespace, s.Name)) @@ -313,59 +337,35 @@ func (p *Proxier) createProxy(ctx context.Context, s *Service) error { //nolint: return fmt.Errorf("selected pod wasn't running, got status: %v", pod.Status.Phase) } - ipAddress := p.serviceIPs[serviceKey] - if ipAddress == nil { - ipAddress, err = p.ipam.AcquireIP(p.ipamPrefix.Cidr) - if err != nil { - return errors.Wrap(err, "failed to allocate IP address") - } - - args := []string{"lo0", "alias", ipAddress.IP.String(), "up"} - if err := exec.Command("ifconfig", args...).Run(); err != nil { - return errors.Wrap(err, "failed to create ip link") - } - - p.serviceIPs[serviceKey] = ipAddress + ports := make([]string, len(s.Ports)) + for i, port := range s.Ports { + ports[i] = fmt.Sprintf("%d:%d", port.LocalPort, port.RemotePort) } - for _, port := range s.Ports { - p.log.Infof("creating port-forward '%s:%d'", serviceKey, port.RemotePort) + p.log.Infof("creating tunnel for service %s", serviceKey) - // build the linking tables - // port -> conn - p.connMutex.Lock() - conn := &ProxyConnection{ - p, - nil, - - ipAddress, - port.LocalPort, - port.RemotePort, - *s, - *pod, - false, - } - - // service -> []Conn - if p.activeServices[serviceKey] == nil { - p.activeServices[serviceKey] = make([]*ProxyConnection, 0) - } - p.activeServices[serviceKey] = append(p.activeServices[serviceKey], conn) - - // pod -> []Conn - if p.activePods[podKey] == nil { - p.activePods[podKey] = make([]*ProxyConnection, 0) - } - p.activePods[podKey] = append(p.activePods[podKey], conn) - p.connMutex.Unlock() + ipAddress, err := p.allocateIP(serviceKey) + if err != nil { + return errors.Wrap(err, "failed to allocate IP") + } - // start the proxy - if err := conn.Start(ctx); err != nil { - p.log.Errorf( - "failed to start proxy for '%s:%d'", - serviceKey, port.RemotePort, port.LocalPort, err, - ) - } + p.connMutex.Lock() + p.activeServices[serviceKey] = &ProxyConnection{ + proxier: p, + IP: ipAddress, + Ports: ports, + Service: *s, + Pod: *pod, + } + p.activePods[podKey] = p.activeServices[serviceKey] + p.connMutex.Unlock() + + // start the proxy + if err := p.activeServices[serviceKey].Start(ctx); err != nil { + p.log.WithError(err).Errorf( + "failed to start proxy for %s", + serviceKey, + ) } // only add addresses for services we actually are routing to @@ -380,25 +380,40 @@ func (p *Proxier) createProxy(ctx context.Context, s *Service) error { //nolint: // Proxy starts a proxier. func (p *Proxier) Proxy(ctx context.Context) error { -createLoop: + var wg sync.WaitGroup for _, s := range p.s { - ticker := backoff.NewTicker(backoff.NewExponentialBackOff()) - createIteratorLoop: - for { - select { - case <-ticker.C: - if err := p.createProxy(ctx, &s); err != nil { //nolint:scopelint - p.log.Warnf("failed to create port-forward for '%s/%s': %v (trying again)", s.Namespace, s.Name, err) + wg.Add(1) + + // spawn a goroutine to create the tunnel + go func(s Service) { + ticker := backoff.NewTicker(backoff.NewExponentialBackOff()) + createLoop: + for { + select { + case <-ticker.C: + if err := p.createProxy(ctx, &s); err != nil { //nolint:scopelint + p.log.WithError(err).Warnf("failed to create tunnel for '%s/%s' (trying again)", s.Namespace, s.Name) + continue + } + ticker.Stop() + break createLoop + case <-ctx.Done(): + break createLoop } - ticker.Stop() - break createIteratorLoop - case <-ctx.Done(): - break createLoop } - } + + wg.Done() + }(s) } + // wait for all of the proxies to be up + wg.Wait() + + p.log.Info("all tunnels created successfully") + + // wait for the process to be terminated <-ctx.Done() + p.log.Info("cleaning up ...") for k := range p.activeServices { @@ -420,7 +435,7 @@ createLoop: } for _, ip := range p.serviceIPs { - args := []string{"lo0", "-alias", ip.IP.String(), "up"} + args := []string{"lo0", "-alias", ip.IP.String()} if err := exec.Command("ifconfig", args...).Run(); err != nil { return errors.Wrapf(err, "failed to remove ip alias '%s'", ip.IP.String()) } diff --git a/internal/proxier/proxy_connection.go b/internal/proxier/proxy_connection.go index 659596b..778de96 100644 --- a/internal/proxier/proxy_connection.go +++ b/internal/proxier/proxy_connection.go @@ -15,12 +15,13 @@ package proxier import ( "context" - "fmt" "github.com/jaredallard/localizer/internal/kube" "github.com/metal-stack/go-ipam" "github.com/pkg/errors" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/portforward" ) @@ -29,33 +30,27 @@ type ProxyConnection struct { proxier *Proxier fw *portforward.PortForwarder - IP *ipam.IP - LocalPort uint - RemotePort uint + // IP is the dedicated IP for this tunnel + IP *ipam.IP - Service Service - Pod corev1.Pod + // Ports is an array of local:remote ports + Ports []string - // Active denotes if this connection is active - // or not - Active bool -} + // Service is the service that this proxy is connected too + Service Service -// GetPort returns the port as a string local:remote -func (pc *ProxyConnection) GetPort() string { - return fmt.Sprintf("%d:%d", pc.LocalPort, pc.RemotePort) + // Pod is the pod powering this proxy + Pod corev1.Pod } // Start starts a proxy connection func (pc *ProxyConnection) Start(ctx context.Context) error { - fw, err := kube.CreatePortForward(ctx, pc.proxier.rest, pc.proxier.kconf, &pc.Pod, pc.IP.IP.String(), pc.GetPort()) + fw, err := kube.CreatePortForward(ctx, pc.proxier.rest, pc.proxier.kconf, &pc.Pod, pc.IP.IP.String(), pc.Ports) if err != nil { - return errors.Wrap(err, "failed to create port-forward") + return errors.Wrap(err, "failed to create tunnel") } pc.fw = fw - pc.Active = true - go func() { // TODO(jaredallard): Figure out a way to better backoff errors here if err := fw.ForwardPorts(); err != nil { @@ -64,7 +59,13 @@ func (pc *ProxyConnection) Start(ctx context.Context) error { pc.Close() pc.fw = nil - pc.proxier.log.WithField("port", pc.GetPort()).Debug("port-forward died") + k, _ := cache.MetaNamespaceKeyFunc(pc.Service) + pc.proxier.log.WithError(err).WithFields(logrus.Fields{ + "ports": pc.Ports, + "service": k, + }).Debug("tunnel died") + + // trigger the recreate logic pc.proxier.handleInformerEvent(ctx, "connection-dead", pc) } }() @@ -75,14 +76,13 @@ func (pc *ProxyConnection) Start(ctx context.Context) error { // Close closes the current proxy connection and marks it as // no longer being active func (pc *ProxyConnection) Close() error { - pc.Active = false - // note: If the parent context was canceled // this has already been closed if pc.fw != nil { pc.fw.Close() } + // if we had an ip address, free it if pc.IP != nil { _, err := pc.proxier.ipam.ReleaseIP(pc.IP) if err != nil {