Skip to content

Commit

Permalink
add rusid heal checks for pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Oct 15, 2021
1 parent e1a9316 commit 88a463c
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 68 deletions.
16 changes: 15 additions & 1 deletion cmd/rusid/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
grpc_api "rusi/pkg/api/runtime/grpc"
components_loader "rusi/pkg/custom-resource/components/loader"
configuration_loader "rusi/pkg/custom-resource/configuration/loader"
"rusi/pkg/healthcheck"
"rusi/pkg/modes"
"rusi/pkg/operator"
"rusi/pkg/runtime"
"time"
)

func main() {
Expand All @@ -21,7 +23,7 @@ func main() {
defer klog.Flush()

cfgBuilder := runtime.NewRuntimeConfigBuilder()
cfgBuilder.AttachCmdFlags(flag.StringVar, flag.BoolVar)
cfgBuilder.AttachCmdFlags(flag.StringVar, flag.BoolVar, flag.IntVar)
flag.Parse()
cfg, err := cfgBuilder.Build()
if err != nil {
Expand Down Expand Up @@ -60,8 +62,20 @@ func main() {
"app id", cfg.AppID, "mode", cfg.Mode)
klog.InfoS("Rusid is using", "config", cfg)

//setup HealthzServer
startHealthzServer(cfg.HealthzPort,
// WithTimeout allows you to set a max overall timeout.
healthcheck.WithTimeout(5*time.Second),
healthcheck.WithChecker("component manager", compManager))

err = rt.Run()
if err != nil {
klog.Error(err)
}
}

func startHealthzServer(healthzPort int, options ...healthcheck.Option) {
if err := healthcheck.Run(context.Background(), healthzPort, options...); err != nil {
klog.Fatalf("failed to start healthz server: %s", err)
}
}
1 change: 0 additions & 1 deletion pkg/api/runtime/api_errors.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package runtime

const (
// PubSub.
ErrPubsubNotConfigured = "no pubsub is configured"
ErrPubsubEmpty = "pubsub name is empty"
ErrPubsubNotFound = "pubsub %s not found"
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/runtime/grpc/grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"k8s.io/klog/v2"
)

func NewGrpcAPI(port string, serverOptions ...grpc.ServerOption) runtime.Api {
func NewGrpcAPI(port int, serverOptions ...grpc.ServerOption) runtime.Api {

srv := &rusiServerImpl{
refreshChannels: []chan bool{},
Expand All @@ -29,7 +29,7 @@ func NewGrpcAPI(port string, serverOptions ...grpc.ServerOption) runtime.Api {
}

type grpcApi struct {
port string
port int
server *rusiServerImpl
serverOptions []grpc.ServerOption
}
Expand All @@ -48,7 +48,7 @@ func (srv *grpcApi) Serve() error {
grpcServer := grpc.NewServer(srv.serverOptions...)
v1.RegisterRusiServer(grpcServer, srv.server)

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%s", srv.port))
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%v", srv.port))
if err != nil {
klog.Fatalf("failed to listen: %v", err)
}
Expand Down
111 changes: 111 additions & 0 deletions pkg/healthcheck/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package healthcheck

import (
"context"
"encoding/json"
"net/http"
"sync"
"time"
)

type response struct {
Status string `json:"status,omitempty"`
Errors map[string]string `json:"errors,omitempty"`
}

type health struct {
checkers map[string]HealthChecker
timeout time.Duration
}

// Handler returns an http.Handler
func Handler(opts ...Option) http.Handler {
h := &health{
checkers: make(map[string]HealthChecker),
timeout: 30 * time.Second,
}
for _, opt := range opts {
opt(h)
}
return h
}

// HandlerFunc returns an http.HandlerFunc to mount the API implementation at a specific route
func HandlerFunc(opts ...Option) http.HandlerFunc {
return Handler(opts...).ServeHTTP
}

// Option adds optional parameter for the HealthcheckHandlerFunc
type Option func(*health)

// WithChecker adds a status checker that needs to be added as part of healthcheck. i.e database, cache or any external dependency
func WithChecker(name string, s HealthChecker) Option {
return func(h *health) {
h.checkers[name] = &timeoutChecker{s}
}
}

// WithTimeout configures the global timeout for all individual checkers.
func WithTimeout(timeout time.Duration) Option {
return func(h *health) {
h.timeout = timeout
}
}

func (h *health) ServeHTTP(w http.ResponseWriter, r *http.Request) {
nCheckers := len(h.checkers)

code := http.StatusOK
errorMsgs := make(map[string]string, nCheckers)

ctx, cancel := context.Background(), func() {}
if h.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, h.timeout)
}
defer cancel()

var mutex sync.Mutex
var wg sync.WaitGroup
wg.Add(nCheckers)

for key, checker := range h.checkers {
go func(key string, checker HealthChecker) {
if r := checker.IsHealthy(ctx); r.Status != Healthy {
mutex.Lock()
errorMsgs[key] = r.Description
code = http.StatusServiceUnavailable
mutex.Unlock()
}
wg.Done()
}(key, checker)
}

wg.Wait()

w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(code)
json.NewEncoder(w).Encode(response{
Status: http.StatusText(code),
Errors: errorMsgs,
})
}

type timeoutChecker struct {
checker HealthChecker
}

func (t *timeoutChecker) IsHealthy(ctx context.Context) HealthResult {
checkerChan := make(chan HealthResult)
go func() {
checkerChan <- t.checker.IsHealthy(ctx)
}()
select {
case r := <-checkerChan:
return r
case <-ctx.Done():
return HealthResult{
Status: Unhealthy,
Description: "max check time exceeded",
}
}
}
43 changes: 43 additions & 0 deletions pkg/healthcheck/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package healthcheck

import (
"context"
"fmt"
"k8s.io/klog/v2"
"net/http"
"time"
)

func Run(ctx context.Context, port int, options ...Option) error {
router := http.NewServeMux()
router.Handle("/healthz", HandlerFunc(options...))

srv := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: router,
}

doneCh := make(chan struct{})

go func() {
select {
case <-ctx.Done():
klog.Info("Healthz server is shutting down")
shutdownCtx, cancel := context.WithTimeout(
context.Background(),
time.Second*5,
)
defer cancel()
srv.Shutdown(shutdownCtx) // nolint: errcheck
case <-doneCh:
}
}()

klog.Infof("Healthz server is listening on %s", srv.Addr)
err := srv.ListenAndServe()
if err != http.ErrServerClosed {
klog.ErrorS(err, "Healthz server error")
}
close(doneCh)
return err
}
31 changes: 31 additions & 0 deletions pkg/healthcheck/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package healthcheck

import "context"

type HealthChecker interface {
IsHealthy(ctx context.Context) HealthResult
}

// CheckerFunc is a convenience type to create functions that implement the HealthChecker interface.
type CheckerFunc func(ctx context.Context) HealthResult

// IsHealthy Implements the HealthChecker interface to allow for any func() HealthResult method
// to be passed as a HealthChecker
func (c CheckerFunc) IsHealthy(ctx context.Context) HealthResult {
return c(ctx)
}

type HealthStatus int

const (
Unhealthy HealthStatus = 0
Degraded = 1
Healthy = 2
)

type HealthResult struct {
Status HealthStatus
Description string
}

var HealthyResult = HealthResult{Status: Healthy}
Loading

0 comments on commit 88a463c

Please sign in to comment.