Skip to content

Commit

Permalink
feat(expose): support E2E transport reconnect, cleanup klog (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredallard authored Oct 2, 2020
1 parent 7c4227c commit 7a17ee8
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 54 deletions.
64 changes: 64 additions & 0 deletions cmd/localizer/localizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package main
import (
"context"
"fmt"
"io/ioutil"
"os"
"os/signal"
"os/user"
"strconv"
"strings"
"syscall"

"github.com/go-logr/logr"
"github.com/jaredallard/localizer/internal/expose"
"github.com/jaredallard/localizer/internal/kube"
"github.com/jaredallard/localizer/internal/proxier"
Expand All @@ -33,8 +35,65 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)

type klogToLogrus struct {
log logrus.FieldLogger
}

// Enabled tests whether this Logger is enabled. For example, commandline
// flags might be used to set the logging verbosity and disable some info
// logs.
func (l *klogToLogrus) Enabled() bool {
return true
}

// Info logs a non-error message with the given key/value pairs as context.
//
// The msg argument should be used to add some constant description to
// the log line. The key/value pairs can then be used to add additional
// variable information. The key/value pairs should alternate string
// keys and arbitrary values.
func (l *klogToLogrus) Info(msg string, keysAndValues ...interface{}) {
l.log.Info(msg)
}

// Error logs an error, with the given message and key/value pairs as context.
// It functions similarly to calling Info with the "error" named value, but may
// have unique behavior, and should be preferred for logging errors (see the
// package documentations for more information).
//
// The msg field should be used to add context to any underlying error,
// while the err field should be used to attach the actual error that
// triggered this log line, if present.
func (l *klogToLogrus) Error(err error, msg string, keysAndValues ...interface{}) {
l.log.WithError(err).Error(msg)
}

// V returns an Logger value for a specific verbosity level, relative to
// this Logger. In other words, V values are additive. V higher verbosity
// level means a log message is less important. It's illegal to pass a log
// level less than zero.
func (l *klogToLogrus) V(level int) logr.Logger {
return l
}

// WithValues adds some key-value pairs of context to a logger.
// See Info for documentation on how key/value pairs work.
func (l *klogToLogrus) WithValues(keysAndValues ...interface{}) logr.Logger {
return l
}

// WithName adds a new element to the logger's name.
// Successive calls with WithName continue to append
// suffixes to the logger's name. It's strongly reccomended
// that name segments contain only letters, digits, and hyphens
// (see the package documentation for more information).
func (l *klogToLogrus) WithName(name string) logr.Logger {
return l
}

func main() { //nolint:funlen,gocyclo
ctx, cancel := context.WithCancel(context.Background())
log := logrus.New()
Expand Down Expand Up @@ -192,6 +251,11 @@ func main() { //nolint:funlen,gocyclo
log.Debug("set log format to JSON")
}

// disable client-go logging
discardLogger := logrus.New()
discardLogger.Out = ioutil.Discard
klog.SetLogger(&klogToLogrus{log: discardLogger})

kconf, k, err = kube.GetKubeClient(c.String("context"))
if err != nil {
return errors.Wrap(err, "failed to create kube client")
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/elazarl/goproxy v0.0.0-20191011121108-aa519ddbe484 // indirect
github.com/elazarl/goproxy/ext v0.0.0-20191011121108-aa519ddbe484 // indirect
github.com/function61/gokit v0.0.0-20200923114939-f8d7e065a5c3
github.com/go-logr/logr v0.2.0
github.com/imdario/mergo v0.3.8 // indirect
github.com/metal-stack/go-ipam v1.6.0
github.com/pkg/errors v0.9.1
Expand All @@ -21,6 +22,7 @@ require (
k8s.io/api v0.19.2
k8s.io/apimachinery v0.19.2
k8s.io/client-go v0.19.2
k8s.io/klog/v2 v2.2.0
k8s.io/kubectl v0.19.2
)

Expand Down
172 changes: 122 additions & 50 deletions internal/expose/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,24 @@ import (
"k8s.io/client-go/tools/portforward"
)

var (
// ErrUnderlyingTransportDied is triggered when the kubernetes port-forward loses
// connection. This results in the transport protocol dying as well.
ErrUnderlyingTransportDied = errors.New("underlying transport died")

// ErrUnderlyingTransportProtocolDied is triggered when the ssh tunnel loses connection,
// this can be due to the ssh connection being destroyed or the port-forward being killed
ErrUnderlyingTransportProtocolDied = errors.New("underlying transport protocol (ssh) died")

// ErrNotInitialized is used to start the initialization
// process. It is not an error, despite its name.
ErrNotInitialized = errors.New("not initialized")

// ErrUnderlyingTransportPodDestroyed is triggered only when a pod is destroyed,
// note that this will usually case ErrUnderlyingTransportDied to be triggered.
ErrUnderlyingTransportPodDestroyed = errors.New("underlying transport pod died")
)

type ServiceForward struct {
c *Client

Expand All @@ -56,11 +74,11 @@ func (s *scaledObjectType) GetKey() string {
return strings.ToLower(fmt.Sprintf("%s/%s/%s", s.Kind, s.Namespace, s.Name))
}

func (p *ServiceForward) createServerPortForward(ctx context.Context, po *corev1.Pod) (*portforward.PortForwarder, error) {
return kube.CreatePortForward(ctx, p.c.k.CoreV1().RESTClient(), p.c.kconf, po, "0.0.0.0", []string{"0:2222"})
func (p *ServiceForward) createServerPortForward(ctx context.Context, po *corev1.Pod, localPort int) (*portforward.PortForwarder, error) {
return kube.CreatePortForward(ctx, p.c.k.CoreV1().RESTClient(), p.c.kconf, po, "0.0.0.0", []string{fmt.Sprintf("%d:2222", localPort)})
}

func (p *ServiceForward) createServerPodAndTransport(ctx context.Context) (cleanupFn func(), port int, err error) { //nolint:funlen,gocyclo
func (p *ServiceForward) createServerPod(ctx context.Context) (func(), *corev1.Pod, error) { //nolint:funlen,gocyclo
// map the service ports into containerPorts, using the
containerPorts := make([]corev1.ContainerPort, len(p.Ports))
for i, port := range p.Ports {
Expand All @@ -78,13 +96,13 @@ func (p *ServiceForward) createServerPodAndTransport(ctx context.Context) (clean
// create a pod for our new expose service
exposedPortsJSON, err := json.Marshal(containerPorts)
if err != nil {
return func() {}, 0, err
return func() {}, nil, err
}

podObject := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: p.Namespace,
GenerateName: fmt.Sprintf("localizer-%s", p.ServiceName),
GenerateName: fmt.Sprintf("localizer-%s-", p.ServiceName),
Annotations: map[string]string{
proxier.ExposedAnnotation: "true",
proxier.ExposedLocalPortAnnotation: string(exposedPortsJSON),
Expand Down Expand Up @@ -142,10 +160,10 @@ func (p *ServiceForward) createServerPodAndTransport(ctx context.Context) (clean

po, err := p.c.k.CoreV1().Pods(p.Namespace).Create(ctx, podObject, metav1.CreateOptions{})
if err != nil {
return func() {}, 0, errors.Wrap(err, "failed to create pod")
return func() {}, nil, errors.Wrap(err, "failed to create pod")
}

cleanupFn = func() {
cleanupFn := func() {
p.c.log.Debug("cleaning up pod")
// cleanup the pod
//nolint:errcheck
Expand All @@ -155,9 +173,8 @@ func (p *ServiceForward) createServerPodAndTransport(ctx context.Context) (clean
p.c.log.Infof("created pod %s", po.ObjectMeta.Name)

p.c.log.Info("waiting for remote pod to be ready ...")
t := time.NewTicker(10 * time.Second)
t := time.NewTicker(3 * time.Second)

loop:
for {
select {
case <-t.C:
Expand All @@ -173,58 +190,54 @@ loop:
for _, cond := range po.Status.Conditions {
if cond.Type == corev1.PodReady {
if cond.Status == corev1.ConditionTrue {
break loop
return cleanupFn, po, nil
}
}
}
case <-ctx.Done():
cleanupFn()
return func() {}, 0, ctx.Err()
return func() {}, nil, ctx.Err()
}
}
}

p.c.log.Info("pod is ready, creating tunnel")

fw, err := p.createServerPortForward(ctx, po)
func (p *ServiceForward) createTransport(ctx context.Context, po *corev1.Pod, localPort int) (int, *portforward.PortForwarder, error) {
fw, err := p.createServerPortForward(ctx, po, localPort)
if err != nil {
cleanupFn()
return func() {}, 0, errors.Wrap(err, "failed to create tunnel for underlying transport")
return 0, nil, errors.Wrap(err, "failed to create tunnel for underlying transport")
}

fw.Ready = make(chan struct{})
go func() {
// TODO(jaredallard): reconnect logic that works with context
if err := fw.ForwardPorts(); err != nil {
p.c.log.WithError(err).Error("underlying transport died")
}
}()

go fw.ForwardPorts()

p.c.log.Debug("waiting for transport to be marked as ready")
select {
case <-fw.Ready:
case <-time.After(time.Second * 10):
return 0, nil, fmt.Errorf("deadline exceeded")
case <-ctx.Done():
cleanupFn()
return func() {}, 0, ctx.Err()
}

ports, err := fw.GetPorts()
if err != nil {
cleanupFn()
return func() {}, 0, errors.Wrap(err, "failed to get generated underlying transport port")
return 0, nil, ctx.Err()
}

for _, p := range ports {
if p.Remote == 2222 {
port = int(p.Local)
// only find the port if we don't already know it
if localPort == 0 {
fwPorts, err := fw.GetPorts()
if err != nil {
return 0, nil, errors.Wrap(err, "failed to get generated underlying transport port")
}
}

if port == 0 {
cleanupFn()
return func() {}, 0, fmt.Errorf("failed to determine the generated underlying transport port")
for _, p := range fwPorts {
if p.Remote == 2222 {
localPort = int(p.Local)
}
}
if localPort == 0 {
return 0, nil, fmt.Errorf("failed to determine the generated underlying transport port")
}
}

return cleanupFn, port, nil
return localPort, fw, nil
}

// Start starts forwarding a service
Expand All @@ -236,12 +249,6 @@ func (p *ServiceForward) Start(ctx context.Context) error {
p.c.log.Debugf("tunneling port %v", ports[i])
}

cleanupFn, localPort, err := p.createServerPodAndTransport(ctx)
if err != nil {
return errors.Wrap(err, "failed to create server and/or transport")
}
defer cleanupFn()

// scale down the other resources that powered this service
for _, o := range p.objects {
p.c.log.Infof("scaling %s from %d -> 0", o.GetKey(), o.Replicas)
Expand All @@ -259,15 +266,80 @@ func (p *ServiceForward) Start(ctx context.Context) error {
}
}()

p.c.log.Debug("creating tunnel client")
cli := ssh.NewReverseTunnelClient(p.c.log, "127.0.0.1", localPort, ports)
err = cli.Start(ctx)
if err != nil {
return errors.Wrap(err, "tunnel client failed")
}
// TODO(jaredallard): handle pod being destroyed
lastErr := ErrNotInitialized
localPort := 0
cleanupFn := func() {}

var po *corev1.Pod
var fw *portforward.PortForwarder
go func() {
for {
var err error
select {
case <-ctx.Done():
return
default:
if lastErr == ErrNotInitialized {
p.c.log.Debug("creating tunnel connection")
} else {
p.c.log.WithError(err).Errorf("connection died, recreating tunnel connection")
}

if lastErr != ErrNotInitialized {
// we can't really do exponential backoff right now, so do a set time
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 5):
}
} else {
// reset our err at this point, if we were not initialized
lastErr = nil
}

// clean up the old pod, if it exists
cleanupFn()

cleanupFn, po, err = p.createServerPod(ctx)
if err != nil {
p.c.log.WithError(err).Debug("failed to create pod")
lastErr = ErrUnderlyingTransportPodDestroyed
continue
}

localPort, fw, err = p.createTransport(ctx, po, 0)
if err != nil {
if fw != nil {
fw.Close()
}

p.c.log.WithError(err).Debug("failed to recreate transport port-forward")
lastErr = ErrUnderlyingTransportDied
continue
}

cli := ssh.NewReverseTunnelClient(p.c.log, "127.0.0.1", localPort, ports)
err = cli.Start(ctx)
if err != nil {
p.c.log.WithError(err).Debug("failed to recreate transport")
lastErr = ErrUnderlyingTransportProtocolDied
} else {
p.c.log.WithError(err).Debug("transport died")
lastErr = ErrUnderlyingTransportDied
}

// cleanup the port-forward if the above died
if fw != nil {
fw.Close()
}
}
}
}()

// wait for the context to finish
<-ctx.Done()

cleanupFn()
return nil
}
1 change: 0 additions & 1 deletion internal/proxier/proxy_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func (pc *ProxyConnection) Start(ctx context.Context) error {
}

go func() {
// TODO(jaredallard): Figure out a way to better backoff errors here
if err := fw.ForwardPorts(); err != nil {
// if this dies, mark the connection as inactive for
// the connection reaper
Expand Down
Loading

0 comments on commit 7a17ee8

Please sign in to comment.