Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(healthchecks): Adding Agent Health Checks for legacy & hubble control planes #1092

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
12 changes: 12 additions & 0 deletions cmd/hubble/daemon_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package hubble
import (
"context"
"fmt"
"os"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -32,6 +33,7 @@ import (
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
logf "sigs.k8s.io/controller-runtime/pkg/log"
zapf "sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand Down Expand Up @@ -66,6 +68,16 @@ var (
return nil, nil, fmt.Errorf("creating new controller-runtime manager: %w", err)
}

if err := ctrlManager.AddHealthzCheck("healthz", healthz.Ping); err != nil {
mereta marked this conversation as resolved.
Show resolved Hide resolved
logger.Error("unable to set up healthz check", err)
os.Exit(1)
mereta marked this conversation as resolved.
Show resolved Hide resolved
}

if err := ctrlManager.AddReadyzCheck("readyz", healthz.Ping); err != nil {
logger.Error("unable to set up readyz check", err)
os.Exit(1)
}

mereta marked this conversation as resolved.
Show resolved Hide resolved
return ctrlManager, ctrlManager.GetClient(), nil
}),

Expand Down
39 changes: 31 additions & 8 deletions cmd/legacy/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package legacy

import (
"fmt"
"net/http"
"os"
"strings"
"time"
Expand Down Expand Up @@ -55,6 +56,10 @@ const (
nodeIPEnvKey = "NODE_IP"
)

var (
healthzChecker healthz.Checker
)

var scheme = k8sruntime.NewScheme()

func init() {
Expand Down Expand Up @@ -215,14 +220,6 @@ func (d *Daemon) Start() error {
}

//+kubebuilder:scaffold:builder

if healthCheckErr := mgr.AddHealthzCheck("healthz", healthz.Ping); healthCheckErr != nil {
mainLogger.Fatal("Unable to set up health check", zap.Error(healthCheckErr))
}
if addReadyCheckErr := mgr.AddReadyzCheck("readyz", healthz.Ping); addReadyCheckErr != nil {
mainLogger.Fatal("Unable to set up ready check", zap.Error(addReadyCheckErr))
}

// k8s Client used for informers
cl := kubernetes.NewForConfigOrDie(mgr.GetConfig())

Expand Down Expand Up @@ -294,6 +291,21 @@ func (d *Daemon) Start() error {
mainLogger.Fatal("unable to create metricsConfigController", zap.Error(err))
}
}

// Define a custom health check for advanced metrics
healthzChecker = healthz.CheckHandler{
Checker: healthz.Checker(func(req *http.Request) error {
_, err := metricsModule.Status()
if err != nil {
mainLogger.Error("failed to get metrics module status fr advanced metrics", zap.Error(err))
return err
}
return nil
}),
}.Checker
} else {
// Advanced Metric not enabled, Ping healthcheck
healthzChecker = healthz.Ping
mereta marked this conversation as resolved.
Show resolved Hide resolved
}

controllerMgr, err := cm.NewControllerManager(daemonConfig, cl, tel)
Expand All @@ -315,6 +327,17 @@ func (d *Daemon) Start() error {
go controllerMgr.Start(ctx)
mainLogger.Info("Started controller manager")

//Set health checks according to retina confiuration
if err := mgr.AddHealthzCheck("healthz", healthzChecker); err != nil {
mainLogger.Error("unable to set up custom health check", zap.Error(err))
os.Exit(1)
}

if err := mgr.AddReadyzCheck("readyz", healthzChecker); err != nil {
mainLogger.Error("unable to set up custom ready check", zap.Error(err))
os.Exit(1)
}

mereta marked this conversation as resolved.
Show resolved Hide resolved
// Start all registered controllers. This will block until container receives SIGTERM.
if err := mgr.Start(ctx); err != nil {
mainLogger.Fatal("unable to start manager", zap.Error(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,22 @@ spec:
cpu: {{ .Values.resources.limits.cpu | quote }}
readinessProbe:
httpGet:
path: /metrics
port: {{ .Values.agent.container.retina.ports.containerPort }}
initialDelaySeconds: 10
periodSeconds: 30
path: /readyz
port: {{ .Values.agent.container.retina.ports.healthPort }}
initialDelaySeconds: {{ .Values.readinessProbe.initialDelaySeconds | default "30" }}
periodSeconds: {{ .Values.readinessProbe.periodSeconds | default "30" }}
timeoutSeconds: {{ .Values.readinessProbe.timeoutSeconds | default "1" }}
failureThreshold: {{ .Values.readinessProbe.failureThreshold | default "3" }}
successThreshold: {{ .Values.readinessProbe.successThreshold | default "1" }}
livenessProbe:
httpGet:
path: /healthz
port: {{ .Values.agent.container.retina.ports.healthPort }}
initialDelaySeconds: {{ .Values.livenessProbe.initialDelaySeconds | default "30" }}
periodSeconds: {{ .Values.livenessProbe.periodSeconds | default "30" }}
timeoutSeconds: {{ .Values.livenessProbe.timeoutSeconds | default "1" }}
failureThreshold: {{ .Values.livenessProbe.failureThreshold | default "3" }}
successThreshold: {{ .Values.livenessProbe.successThreshold | default "1" }}
env:
- name: POD_NAME
valueFrom:
Expand Down
27 changes: 27 additions & 0 deletions deploy/hubble/manifests/controller/helm/retina/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ agent:
metricsBindAddress: ":18080"
ports:
containerPort: 10093
healthPort: 18081

enablePodLevel: true
remoteContext: false
Expand Down Expand Up @@ -933,3 +934,29 @@ tls:
# -----BEGIN CERTIFICATE-----
# ...
# -----END CERTIFICATE-----
## @param readinessProbe.initialDelaySeconds [array] Initial delay seconds for readinessProbe
## @param readinessProbe.periodSeconds [array] Period seconds for readinessProbe
## @param readinessProbe.timeoutSeconds [array] Timeout seconds for readinessProbe
## @param readinessProbe.failureThreshold [array] Failure threshold for readinessProbe
## @param readinessProbe.successThreshold [array] Success threshold for readinessProbe
## Ref: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#configure-probes
##
readinessProbe: {}
# initialDelaySeconds: 30
# periodSeconds: 10
# timeoutSeconds: 15
# failureThreshold: 5
# successThreshold: 1
## @param livenessProbe.initialDelaySeconds [array] Initial delay seconds for livenessProbe
## @param livenessProbe.periodSeconds [array] Period seconds for livenessProbe
## @param livenessProbe.timeoutSeconds [array] Timeout seconds for livenessProbe
## @param livenessProbe.failureThreshold [array] Failure threshold for livenessProbe
## @param livenessProbe.successThreshold [array] Success threshold for livenessProbe
## Ref: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#configure-probes
##
livenessProbe: {}
# initialDelaySeconds: 30
# periodSeconds: 10
# timeoutSeconds: 15
# failureThreshold: 5
# successThreshold: 1
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ spec:
- name: {{ include "retina.name" . }}
readinessProbe:
httpGet:
path: /metrics
port: {{ .Values.retinaPort }}
path: /readyz
port: {{ .Values.daemonset.container.retina.ports.healthPort }}
initialDelaySeconds: {{ .Values.readinessProbe.initialDelaySeconds | default "30" }}
periodSeconds: {{ .Values.readinessProbe.periodSeconds | default "30" }}
timeoutSeconds: {{ .Values.readinessProbe.timeoutSeconds | default "1" }}
failureThreshold: {{ .Values.readinessProbe.failureThreshold | default "3" }}
successThreshold: {{ .Values.readinessProbe.successThreshold | default "1" }}
livenessProbe:
httpGet:
path: /metrics
port: {{ .Values.retinaPort }}
path: /healthz
port: {{ .Values.daemonset.container.retina.ports.healthPort }}
initialDelaySeconds: {{ .Values.livenessProbe.initialDelaySeconds | default "30" }}
periodSeconds: {{ .Values.livenessProbe.periodSeconds | default "30" }}
timeoutSeconds: {{ .Values.livenessProbe.timeoutSeconds | default "1" }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ daemonset:
metricsBindAddress: ":18080"
ports:
containerPort: 10093
healthPort: 18081
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create a new port? and not reuse the existing 10093 ? since we are a hostnet: true pod, any port number we use comes with restrictions as it takes away from nodes usable port ranges. In AKS, we will have to pre register the ports we will be using to make sure customers are also aware of the ports they should not use to avoid any conflict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be already configured in Legacy & Hubble control planes as default HealthProbeBindAddress.
Its always provided when we set up k8s controller runtime options.
Hubble:

HealthProbeBindAddress: ":18001",

Legacy: https://github.com/microsoft/retina/blob/4b12472cc5007e0931238dd5e26f819e6be093b5/cmd/root.go#L42C10-L42C15

I think also @rbtr mentioned having a configurable port.
Do you think we should change to 10093?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I did some testing and got errors when using port 10093.
I don't think we can reuse the Retina port, some findings below:

We start a http api server with that port in controller manager:

// create HTTP server for API server

And the trail for setting up health check is:
The HealthProbeBindAddress we pass as part of options in the ctrl manager ends up here where a listener is created.
comment:
This will throw an error if the bind address is invalid or already in use.
(same logic as providing pprof address below)
https://github.com/kubernetes-sigs/controller-runtime/blob/aea2e32a936584b06ae6f7992f856fe7292b0297/pkg/manager/manager.go#L407

If listener exists k8s tries to add HealthProbeServer:
https://github.com/kubernetes-sigs/controller-runtime/blob/aea2e32a936584b06ae6f7992f856fe7292b0297/pkg/manager/internal.go#L400

addHealthProverServer() definition - creates a new server:
https://github.com/kubernetes-sigs/controller-runtime/blob/aea2e32a936584b06ae6f7992f856fe7292b0297/pkg/manager/internal.go#L290

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with Vamsi. Controller Manager's implementation isn't fit for purpose here, so we should just use the standard library and write the two HTTP handlers on whatever server is bound to 10093. It will probably provide a better healthcheck overall, since we're testing whether traffic can be served on 10093.


# volume mounts with name and mountPath
volumeMounts:
Expand Down
55 changes: 55 additions & 0 deletions pkg/enricher/enricher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ package enricher

import (
"context"
"errors"
"io"
"reflect"
"sync"

"time"

"github.com/cilium/cilium/api/v1/flow"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
"github.com/cilium/cilium/pkg/hubble/container"
Expand Down Expand Up @@ -189,3 +193,54 @@ func (e *Enricher) Write(ev *v1.Event) {
func (e *Enricher) ExportReader() *container.RingReader {
return container.NewRingReader(e.outputRing, e.outputRing.OldestWrite())
}

func (e *Enricher) Status() (float64, error) {
rate, err := e.getFlowRate(e.outputRing, time.Now())
if err != nil {
e.l.Error("failed to get flow rate %w", zap.Error(err))
return 0, err
}
return rate, nil
}

// ref: "getFlowRate" "github.com/cilium/cilium/pkg/hubble/observer/local_observer.go"
func (en *Enricher) getFlowRate(ring *container.Ring, at time.Time) (float64, error) {
reader := container.NewRingReader(ring, ring.LastWriteParallel())
count := 0
since := at.Add(-1 * time.Minute)
var lastSeenEvent *v1.Event
for {
e, err := reader.Previous()
lost := e.GetLostEvent()
if lost != nil {
// a lost event means we read the complete ring buffer
// if we read at least one flow, update `since` to calculate the rate over the available time range
if lastSeenEvent != nil {
since = lastSeenEvent.Timestamp.AsTime()
}
break
} else if errors.Is(err, io.EOF) {
// an EOF error means the ring buffer is empty, ignore error and continue
break
} else if err != nil {
// unexpected error
return 0, err
}
if _, isFlowEvent := e.Event.(*flow.Flow); !isFlowEvent {
// ignore non flow events
continue
}
if err := e.Timestamp.CheckValid(); err != nil {
return 0, err
}
ts := e.Timestamp.AsTime()
if ts.Before(since) {
// scanned the last minute, exit loop
break
}
lastSeenEvent = e
count++
}
fl := float64(count) / at.Sub(since).Seconds()
return fl, nil
}
mereta marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions pkg/enricher/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type EnricherInterface interface {
Run()
Write(ev *v1.Event)
ExportReader() *container.RingReader
Status() (float64, error)
}
4 changes: 4 additions & 0 deletions pkg/module/metrics/metrics_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func InitModule(ctx context.Context,
return m
}

func (m *Module) Status() (float64, error) {
return m.enricher.Status()
}

func (m *Module) Reconcile(spec *api.MetricsSpec) error {
// If the new spec has not changed, then do nothing.
if m.currentSpec != nil && m.currentSpec.Equals(spec) {
Expand Down
Loading