From e0aeff3eb2fa80ab9ef77943905f63f7a087407d Mon Sep 17 00:00:00 2001 From: darktech Date: Fri, 15 Oct 2021 11:50:55 +0300 Subject: [PATCH] fix healtcheck --- cmd/rusid/sidecar.go | 28 ++++++++++++++++++++++------ pkg/api/runtime/api.go | 7 +++++-- pkg/api/runtime/grpc/grpc_api.go | 9 ++++++++- pkg/api/runtime/test_api.go | 3 ++- pkg/runtime/runtime.go | 4 ++-- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/cmd/rusid/sidecar.go b/cmd/rusid/sidecar.go index 1f65707..c864b5d 100644 --- a/cmd/rusid/sidecar.go +++ b/cmd/rusid/sidecar.go @@ -4,6 +4,8 @@ import ( "context" "flag" "k8s.io/klog/v2" + "os" + "os/signal" "rusi/internal/tracing" grpc_api "rusi/pkg/api/runtime/grpc" components_loader "rusi/pkg/custom-resource/components/loader" @@ -16,7 +18,7 @@ import ( ) func main() { - mainCtx := context.Background() + mainCtx, cancel := context.WithCancel(context.Background()) //https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md klog.InitFlags(nil) @@ -62,20 +64,34 @@ func main() { "app id", cfg.AppID, "mode", cfg.Mode) klog.InfoS("Rusid is using", "config", cfg) - //setup HealthzServer - startHealthzServer(cfg.HealthzPort, + //Healthz server + go startHealthzServer(mainCtx, cfg.HealthzPort, // WithTimeout allows you to set a max overall timeout. healthcheck.WithTimeout(5*time.Second), healthcheck.WithChecker("component manager", compManager)) - err = rt.Run() + shutdownOnInterrupt(cancel) + + err = rt.Run(mainCtx) //blocks if err != nil { klog.Error(err) } } -func startHealthzServer(healthzPort int, options ...healthcheck.Option) { - if err := healthcheck.Run(context.Background(), healthzPort, options...); err != nil { +func shutdownOnInterrupt(cancel func()) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + + go func() { + <-c + klog.InfoS("Shutdown requested") + cancel() + }() + +} + +func startHealthzServer(ctx context.Context, healthzPort int, options ...healthcheck.Option) { + if err := healthcheck.Run(ctx, healthzPort, options...); err != nil { klog.Fatalf("failed to start healthz server: %s", err) } } diff --git a/pkg/api/runtime/api.go b/pkg/api/runtime/api.go index 32782ee..10449c1 100644 --- a/pkg/api/runtime/api.go +++ b/pkg/api/runtime/api.go @@ -1,9 +1,12 @@ package runtime -import "rusi/pkg/messaging" +import ( + "context" + "rusi/pkg/messaging" +) type Api interface { - Serve() error + Serve(ctx context.Context) error Refresh() error SetPublishHandler(messaging.PublishRequestHandler) SetSubscribeHandler(messaging.SubscribeRequestHandler) diff --git a/pkg/api/runtime/grpc/grpc_api.go b/pkg/api/runtime/grpc/grpc_api.go index e9275a5..0fe3ea9 100644 --- a/pkg/api/runtime/grpc/grpc_api.go +++ b/pkg/api/runtime/grpc/grpc_api.go @@ -44,7 +44,7 @@ func (srv *grpcApi) Refresh() error { return srv.server.Refresh() } -func (srv *grpcApi) Serve() error { +func (srv *grpcApi) Serve(ctx context.Context) error { grpcServer := grpc.NewServer(srv.serverOptions...) v1.RegisterRusiServer(grpcServer, srv.server) @@ -53,6 +53,13 @@ func (srv *grpcApi) Serve() error { klog.Fatalf("failed to listen: %v", err) } + go func() { + select { + case <-ctx.Done(): + grpcServer.Stop() + } + }() + return grpcServer.Serve(lis) } diff --git a/pkg/api/runtime/test_api.go b/pkg/api/runtime/test_api.go index 7078cd5..34e4846 100644 --- a/pkg/api/runtime/test_api.go +++ b/pkg/api/runtime/test_api.go @@ -1,6 +1,7 @@ package runtime import ( + "context" "rusi/pkg/messaging" ) @@ -14,7 +15,7 @@ func NewTestApi() *TestApi { return &TestApi{RefreshChan: make(chan bool)} } -func (TestApi) Serve() error { +func (TestApi) Serve(ctx context.Context) error { return nil } diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index fb6627e..7b8ea28 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -140,6 +140,6 @@ func (rt *runtime) SubscribeHandler(ctx context.Context, request messaging.Subsc return srv.StartSubscribing(request) } -func (rt *runtime) Run() error { - return rt.api.Serve() +func (rt *runtime) Run(ctx context.Context) error { + return rt.api.Serve(ctx) }