diff --git a/cmd/localizer/localizer.go b/cmd/localizer/localizer.go index a43513c..c8b4669 100644 --- a/cmd/localizer/localizer.go +++ b/cmd/localizer/localizer.go @@ -16,6 +16,7 @@ package main import ( "context" "fmt" + "io/ioutil" "os" "os/signal" "os/user" @@ -23,6 +24,7 @@ import ( "strings" "syscall" + "github.com/go-logr/logr" "github.com/jaredallard/localizer/internal/expose" "github.com/jaredallard/localizer/internal/kube" "github.com/jaredallard/localizer/internal/proxier" @@ -33,8 +35,65 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/klog/v2" ) +type klogToLogrus struct { + log logrus.FieldLogger +} + +// Enabled tests whether this Logger is enabled. For example, commandline +// flags might be used to set the logging verbosity and disable some info +// logs. +func (l *klogToLogrus) Enabled() bool { + return true +} + +// Info logs a non-error message with the given key/value pairs as context. +// +// The msg argument should be used to add some constant description to +// the log line. The key/value pairs can then be used to add additional +// variable information. The key/value pairs should alternate string +// keys and arbitrary values. +func (l *klogToLogrus) Info(msg string, keysAndValues ...interface{}) { + l.log.Info(msg) +} + +// Error logs an error, with the given message and key/value pairs as context. +// It functions similarly to calling Info with the "error" named value, but may +// have unique behavior, and should be preferred for logging errors (see the +// package documentations for more information). +// +// The msg field should be used to add context to any underlying error, +// while the err field should be used to attach the actual error that +// triggered this log line, if present. +func (l *klogToLogrus) Error(err error, msg string, keysAndValues ...interface{}) { + l.log.WithError(err).Error(msg) +} + +// V returns an Logger value for a specific verbosity level, relative to +// this Logger. In other words, V values are additive. V higher verbosity +// level means a log message is less important. It's illegal to pass a log +// level less than zero. +func (l *klogToLogrus) V(level int) logr.Logger { + return l +} + +// WithValues adds some key-value pairs of context to a logger. +// See Info for documentation on how key/value pairs work. +func (l *klogToLogrus) WithValues(keysAndValues ...interface{}) logr.Logger { + return l +} + +// WithName adds a new element to the logger's name. +// Successive calls with WithName continue to append +// suffixes to the logger's name. It's strongly reccomended +// that name segments contain only letters, digits, and hyphens +// (see the package documentation for more information). +func (l *klogToLogrus) WithName(name string) logr.Logger { + return l +} + func main() { //nolint:funlen,gocyclo ctx, cancel := context.WithCancel(context.Background()) log := logrus.New() @@ -192,6 +251,11 @@ func main() { //nolint:funlen,gocyclo log.Debug("set log format to JSON") } + // disable client-go logging + discardLogger := logrus.New() + discardLogger.Out = ioutil.Discard + klog.SetLogger(&klogToLogrus{log: discardLogger}) + kconf, k, err = kube.GetKubeClient(c.String("context")) if err != nil { return errors.Wrap(err, "failed to create kube client") diff --git a/go.mod b/go.mod index 25ddc33..ecc2839 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/elazarl/goproxy v0.0.0-20191011121108-aa519ddbe484 // indirect github.com/elazarl/goproxy/ext v0.0.0-20191011121108-aa519ddbe484 // indirect github.com/function61/gokit v0.0.0-20200923114939-f8d7e065a5c3 + github.com/go-logr/logr v0.2.0 github.com/imdario/mergo v0.3.8 // indirect github.com/metal-stack/go-ipam v1.6.0 github.com/pkg/errors v0.9.1 @@ -21,6 +22,7 @@ require ( k8s.io/api v0.19.2 k8s.io/apimachinery v0.19.2 k8s.io/client-go v0.19.2 + k8s.io/klog/v2 v2.2.0 k8s.io/kubectl v0.19.2 ) diff --git a/internal/expose/port.go b/internal/expose/port.go index 25bfc81..cb4deac 100644 --- a/internal/expose/port.go +++ b/internal/expose/port.go @@ -32,6 +32,24 @@ import ( "k8s.io/client-go/tools/portforward" ) +var ( + // ErrUnderlyingTransportDied is triggered when the kubernetes port-forward loses + // connection. This results in the transport protocol dying as well. + ErrUnderlyingTransportDied = errors.New("underlying transport died") + + // ErrUnderlyingTransportProtocolDied is triggered when the ssh tunnel loses connection, + // this can be due to the ssh connection being destroyed or the port-forward being killed + ErrUnderlyingTransportProtocolDied = errors.New("underlying transport protocol (ssh) died") + + // ErrNotInitialized is used to start the initialization + // process. It is not an error, despite its name. + ErrNotInitialized = errors.New("not initialized") + + // ErrUnderlyingTransportPodDestroyed is triggered only when a pod is destroyed, + // note that this will usually case ErrUnderlyingTransportDied to be triggered. + ErrUnderlyingTransportPodDestroyed = errors.New("underlying transport pod died") +) + type ServiceForward struct { c *Client @@ -56,11 +74,11 @@ func (s *scaledObjectType) GetKey() string { return strings.ToLower(fmt.Sprintf("%s/%s/%s", s.Kind, s.Namespace, s.Name)) } -func (p *ServiceForward) createServerPortForward(ctx context.Context, po *corev1.Pod) (*portforward.PortForwarder, error) { - return kube.CreatePortForward(ctx, p.c.k.CoreV1().RESTClient(), p.c.kconf, po, "0.0.0.0", []string{"0:2222"}) +func (p *ServiceForward) createServerPortForward(ctx context.Context, po *corev1.Pod, localPort int) (*portforward.PortForwarder, error) { + return kube.CreatePortForward(ctx, p.c.k.CoreV1().RESTClient(), p.c.kconf, po, "0.0.0.0", []string{fmt.Sprintf("%d:2222", localPort)}) } -func (p *ServiceForward) createServerPodAndTransport(ctx context.Context) (cleanupFn func(), port int, err error) { //nolint:funlen,gocyclo +func (p *ServiceForward) createServerPod(ctx context.Context) (func(), *corev1.Pod, error) { //nolint:funlen,gocyclo // map the service ports into containerPorts, using the containerPorts := make([]corev1.ContainerPort, len(p.Ports)) for i, port := range p.Ports { @@ -78,13 +96,13 @@ func (p *ServiceForward) createServerPodAndTransport(ctx context.Context) (clean // create a pod for our new expose service exposedPortsJSON, err := json.Marshal(containerPorts) if err != nil { - return func() {}, 0, err + return func() {}, nil, err } podObject := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: p.Namespace, - GenerateName: fmt.Sprintf("localizer-%s", p.ServiceName), + GenerateName: fmt.Sprintf("localizer-%s-", p.ServiceName), Annotations: map[string]string{ proxier.ExposedAnnotation: "true", proxier.ExposedLocalPortAnnotation: string(exposedPortsJSON), @@ -142,10 +160,10 @@ func (p *ServiceForward) createServerPodAndTransport(ctx context.Context) (clean po, err := p.c.k.CoreV1().Pods(p.Namespace).Create(ctx, podObject, metav1.CreateOptions{}) if err != nil { - return func() {}, 0, errors.Wrap(err, "failed to create pod") + return func() {}, nil, errors.Wrap(err, "failed to create pod") } - cleanupFn = func() { + cleanupFn := func() { p.c.log.Debug("cleaning up pod") // cleanup the pod //nolint:errcheck @@ -155,9 +173,8 @@ func (p *ServiceForward) createServerPodAndTransport(ctx context.Context) (clean p.c.log.Infof("created pod %s", po.ObjectMeta.Name) p.c.log.Info("waiting for remote pod to be ready ...") - t := time.NewTicker(10 * time.Second) + t := time.NewTicker(3 * time.Second) -loop: for { select { case <-t.C: @@ -173,58 +190,54 @@ loop: for _, cond := range po.Status.Conditions { if cond.Type == corev1.PodReady { if cond.Status == corev1.ConditionTrue { - break loop + return cleanupFn, po, nil } } } case <-ctx.Done(): cleanupFn() - return func() {}, 0, ctx.Err() + return func() {}, nil, ctx.Err() } } +} - p.c.log.Info("pod is ready, creating tunnel") - - fw, err := p.createServerPortForward(ctx, po) +func (p *ServiceForward) createTransport(ctx context.Context, po *corev1.Pod, localPort int) (int, *portforward.PortForwarder, error) { + fw, err := p.createServerPortForward(ctx, po, localPort) if err != nil { - cleanupFn() - return func() {}, 0, errors.Wrap(err, "failed to create tunnel for underlying transport") + return 0, nil, errors.Wrap(err, "failed to create tunnel for underlying transport") } fw.Ready = make(chan struct{}) - go func() { - // TODO(jaredallard): reconnect logic that works with context - if err := fw.ForwardPorts(); err != nil { - p.c.log.WithError(err).Error("underlying transport died") - } - }() + + go fw.ForwardPorts() p.c.log.Debug("waiting for transport to be marked as ready") select { case <-fw.Ready: + case <-time.After(time.Second * 10): + return 0, nil, fmt.Errorf("deadline exceeded") case <-ctx.Done(): - cleanupFn() - return func() {}, 0, ctx.Err() - } - - ports, err := fw.GetPorts() - if err != nil { - cleanupFn() - return func() {}, 0, errors.Wrap(err, "failed to get generated underlying transport port") + return 0, nil, ctx.Err() } - for _, p := range ports { - if p.Remote == 2222 { - port = int(p.Local) + // only find the port if we don't already know it + if localPort == 0 { + fwPorts, err := fw.GetPorts() + if err != nil { + return 0, nil, errors.Wrap(err, "failed to get generated underlying transport port") } - } - if port == 0 { - cleanupFn() - return func() {}, 0, fmt.Errorf("failed to determine the generated underlying transport port") + for _, p := range fwPorts { + if p.Remote == 2222 { + localPort = int(p.Local) + } + } + if localPort == 0 { + return 0, nil, fmt.Errorf("failed to determine the generated underlying transport port") + } } - return cleanupFn, port, nil + return localPort, fw, nil } // Start starts forwarding a service @@ -236,12 +249,6 @@ func (p *ServiceForward) Start(ctx context.Context) error { p.c.log.Debugf("tunneling port %v", ports[i]) } - cleanupFn, localPort, err := p.createServerPodAndTransport(ctx) - if err != nil { - return errors.Wrap(err, "failed to create server and/or transport") - } - defer cleanupFn() - // scale down the other resources that powered this service for _, o := range p.objects { p.c.log.Infof("scaling %s from %d -> 0", o.GetKey(), o.Replicas) @@ -259,15 +266,80 @@ func (p *ServiceForward) Start(ctx context.Context) error { } }() - p.c.log.Debug("creating tunnel client") - cli := ssh.NewReverseTunnelClient(p.c.log, "127.0.0.1", localPort, ports) - err = cli.Start(ctx) - if err != nil { - return errors.Wrap(err, "tunnel client failed") - } + // TODO(jaredallard): handle pod being destroyed + lastErr := ErrNotInitialized + localPort := 0 + cleanupFn := func() {} + + var po *corev1.Pod + var fw *portforward.PortForwarder + go func() { + for { + var err error + select { + case <-ctx.Done(): + return + default: + if lastErr == ErrNotInitialized { + p.c.log.Debug("creating tunnel connection") + } else { + p.c.log.WithError(err).Errorf("connection died, recreating tunnel connection") + } + + if lastErr != ErrNotInitialized { + // we can't really do exponential backoff right now, so do a set time + select { + case <-ctx.Done(): + return + case <-time.After(time.Second * 5): + } + } else { + // reset our err at this point, if we were not initialized + lastErr = nil + } + + // clean up the old pod, if it exists + cleanupFn() + + cleanupFn, po, err = p.createServerPod(ctx) + if err != nil { + p.c.log.WithError(err).Debug("failed to create pod") + lastErr = ErrUnderlyingTransportPodDestroyed + continue + } + + localPort, fw, err = p.createTransport(ctx, po, 0) + if err != nil { + if fw != nil { + fw.Close() + } + + p.c.log.WithError(err).Debug("failed to recreate transport port-forward") + lastErr = ErrUnderlyingTransportDied + continue + } + + cli := ssh.NewReverseTunnelClient(p.c.log, "127.0.0.1", localPort, ports) + err = cli.Start(ctx) + if err != nil { + p.c.log.WithError(err).Debug("failed to recreate transport") + lastErr = ErrUnderlyingTransportProtocolDied + } else { + p.c.log.WithError(err).Debug("transport died") + lastErr = ErrUnderlyingTransportDied + } + + // cleanup the port-forward if the above died + if fw != nil { + fw.Close() + } + } + } + }() // wait for the context to finish <-ctx.Done() + cleanupFn() return nil } diff --git a/internal/proxier/proxy_connection.go b/internal/proxier/proxy_connection.go index 7b48be1..9d4c845 100644 --- a/internal/proxier/proxy_connection.go +++ b/internal/proxier/proxy_connection.go @@ -82,7 +82,6 @@ func (pc *ProxyConnection) Start(ctx context.Context) error { } go func() { - // TODO(jaredallard): Figure out a way to better backoff errors here if err := fw.ForwardPorts(); err != nil { // if this dies, mark the connection as inactive for // the connection reaper diff --git a/internal/ssh/client.go b/internal/ssh/client.go index 17d7deb..1fbc397 100644 --- a/internal/ssh/client.go +++ b/internal/ssh/client.go @@ -3,6 +3,7 @@ package ssh import ( "context" "fmt" + "io" "net" "strings" "sync" @@ -73,6 +74,8 @@ func NewReverseTunnelClient(l logrus.FieldLogger, host string, port int, ports [ return &Client{l, host, port, portMap} } +// Start starts the ssh tunnel. This blocks until +// all listeners have closed func (c *Client) Start(ctx context.Context) error { dialer := net.Dialer{ Timeout: 10 * time.Second, @@ -83,6 +86,7 @@ func (c *Client) Start(ctx context.Context) error { if err != nil { return err } + defer conn.Close() sconn, chans, reqs, err := ssh.NewClientConn(conn, addr, &ssh.ClientConfig{ User: "outreach", @@ -125,7 +129,11 @@ func (c *Client) Start(ctx context.Context) error { } client, err := listener.Accept() if err != nil { - c.log.WithError(err).Errorf("failed to accept traffic on remote listener") + if err != io.EOF { + c.log.WithError(err).Errorf("failed to accept traffic on remote listener") + } else { + c.log.Warnf("remote listener closed the connection") + } return } @@ -136,7 +144,6 @@ func (c *Client) Start(ctx context.Context) error { }(remotePort) } - <-ctx.Done() wg.Wait() return nil @@ -160,6 +167,6 @@ func (c *Client) handleReverseForwardConn(client net.Conn, localAddr string) { // - the "client" and "remote" strings we give Pipe() is just for error&log messages // - this blocks until either of the parties' socket closes (or breaks) if err := bidipipe.Pipe(bidipipe.WithName("client", client), bidipipe.WithName("remote", remote)); err != nil { - c.log.WithError(err).Errorf("tunnel failed") + c.log.WithError(err).Warnf("failed to send data over tunnel") } }