Skip to content

Commit

Permalink
feat: hack in support for first pod in a statefulset (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredallard authored Oct 20, 2020
1 parent 233dbf0 commit 72b8612
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 37 deletions.
16 changes: 9 additions & 7 deletions internal/proxier/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ const (
type Service struct {
Name string
Namespace string
Ports []*ServicePort

// Type is if it's a: ClusterIP, or StatefulSet
Type string
Ports []*ServicePort
}

// GetKey returns a cache stable key for a Service
Expand All @@ -58,16 +61,11 @@ func CreateServiceFromKubernetesService(ctx context.Context, olog logrus.FieldLo
Name: kserv.Name,
Namespace: kserv.Namespace,
Ports: make([]*ServicePort, 0),
Type: "ClusterIP",
}
key := serv.GetKey()
log := olog.WithField("service", key)

// TODO: handle
// In general we don't support non-clusterIP services
if kserv.Spec.ClusterIP == "None" {
return Service{}, fmt.Errorf("service had no cluster ip")
}

// skip services that have no ports
if len(kserv.Spec.Ports) == 0 {
return Service{}, fmt.Errorf("service had no defined ports")
Expand All @@ -78,6 +76,10 @@ func CreateServiceFromKubernetesService(ctx context.Context, olog logrus.FieldLo
return Service{}, fmt.Errorf("service had no selector")
}

if kserv.Spec.ClusterIP == "None" {
serv.Type = "StatefulSet"
}

// convert the Kubernetes ports into our own internal data model
// we also handle overriding localPorts via the RemapAnnotation here.
servicePorts, exists, err := kube.ResolveServicePorts(ctx, k, kserv)
Expand Down
17 changes: 1 addition & 16 deletions internal/proxier/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,22 +171,7 @@ func (p *Proxier) handleInformerEvent(ctx context.Context, event string, obj int
p.connMutex.Unlock()

p.log.Warnf("tunnel for %s is being refreshed due to underlying pod being destroyed", serviceKey)
ticker := backoff.NewTicker(backoff.NewExponentialBackOff())
for {
select {
case <-ticker.C:
if err := p.createProxy(ctx, &s); err != nil { //nolint:scopelint
p.log.WithError(err).Warnf("failed to refresh tunnel for %s (trying again)", serviceKey)
continue
}
ticker.Stop()
p.log.Infof("refreshed tunnel for '%s'", serviceKey)
return
case <-ctx.Done():
return
}
}

p.CreateProxy(ctx, &s)
case "endpoint":
if event == "delete" {
return
Expand Down
36 changes: 22 additions & 14 deletions internal/proxier/proxy_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (
"github.com/jaredallard/localizer/internal/kube"
"github.com/metal-stack/go-ipam"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/portforward"
)

Expand All @@ -50,18 +48,32 @@ type ProxyConnection struct {
// for a given kubernetes service
func (pc *ProxyConnection) GetAddresses() []string {
s := pc.Service

name := s.Name
namespace := s.Namespace

// TODO: This is a quick hack for -0
// but ultimately we need to support all the pods
// in a statefulset service
if s.Type == "StatefulSet" {
pc.proxier.log.WithField("service", pc.Service.GetKey()).Debug("proxying a statefulset")
name = pc.Pod.Name + "." + s.Name
}

return []string{
s.Name,
fmt.Sprintf("%s.%s", s.Name, s.Namespace),
fmt.Sprintf("%s.%s.svc", s.Name, s.Namespace),
fmt.Sprintf("%s.%s.svc.cluster", s.Name, s.Namespace),
fmt.Sprintf("%s.%s.svc.cluster.local", s.Name, s.Namespace),
name,
fmt.Sprintf("%s.%s", name, namespace),
fmt.Sprintf("%s.%s.svc", name, namespace),
fmt.Sprintf("%s.%s.svc.cluster", name, namespace),
fmt.Sprintf("%s.%s.svc.cluster.local", name, namespace),
}
}

// Start starts a proxy connection
func (pc *ProxyConnection) Start(ctx context.Context) error {
serviceKey := pc.Service.GetKey()
log := pc.proxier.log.WithField("service", serviceKey)

ipAddress, err := pc.proxier.allocateIP(serviceKey)
if err != nil {
return errors.Wrap(err, "failed to allocate IP")
Expand All @@ -75,7 +87,7 @@ func (pc *ProxyConnection) Start(ctx context.Context) error {
pc.fw = fw

// only add addresses for services we actually are routing to
pc.proxier.log.Debugf("adding hosts file entry for service '%s'", serviceKey)
log.Debugf("adding hosts file entry for service")
pc.proxier.hosts.AddHosts(pc.IP.IP.String(), pc.GetAddresses())
if err := pc.proxier.hosts.Save(); err != nil {
return errors.Wrap(err, "failed to save address to hosts")
Expand All @@ -88,11 +100,7 @@ func (pc *ProxyConnection) Start(ctx context.Context) error {
pc.Close()
pc.fw = nil

k, _ := cache.MetaNamespaceKeyFunc(pc.Service)
pc.proxier.log.WithError(err).WithFields(logrus.Fields{
"ports": pc.Ports,
"service": k,
}).Debug("tunnel died")
log.WithError(err).WithField("ports", pc.Ports).Debug("tunnel died")

// trigger the recreate logic
pc.proxier.handleInformerEvent(ctx, "connection-dead", pc)
Expand All @@ -117,7 +125,7 @@ func (pc *ProxyConnection) Close() error {
}

// cleanup the DNS entries for this ProxyConnection
pc.proxier.hosts.RemoveAddresses(pc.GetAddresses())
pc.proxier.hosts.RemoveHosts(pc.GetAddresses())
if err := pc.proxier.hosts.Save(); err != nil {
return errors.Wrap(err, "failed to remove hosts entry")
}
Expand Down

0 comments on commit 72b8612

Please sign in to comment.