From 6c8db445dfabbfadd2a29890e8d968ed238ea27a Mon Sep 17 00:00:00 2001 From: darktech Date: Mon, 18 Oct 2021 11:06:57 +0300 Subject: [PATCH] fix operator disconnects --- cmd/rusid/sidecar.go | 27 +++++++------ internal/tracing/tracing.go | 16 +++++--- pkg/api/runtime/grpc/grpc_api.go | 2 +- .../configuration/loader/loader.go | 2 +- .../configuration/loader/local.go | 40 ++++++++++--------- .../configuration/loader/local_test.go | 8 ++-- pkg/operator/client.go | 28 +++++++------ pkg/runtime/components_manager.go | 1 + pkg/runtime/runtime.go | 7 ++-- pkg/runtime/runtime_test.go | 4 +- 10 files changed, 75 insertions(+), 60 deletions(-) diff --git a/cmd/rusid/sidecar.go b/cmd/rusid/sidecar.go index eb01874..7964a02 100644 --- a/cmd/rusid/sidecar.go +++ b/cmd/rusid/sidecar.go @@ -15,11 +15,13 @@ import ( "rusi/pkg/modes" "rusi/pkg/operator" "rusi/pkg/runtime" + "sync" "time" ) func main() { mainCtx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} //https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md klog.InitFlags(nil) @@ -34,19 +36,14 @@ func main() { return } compLoader := components_loader.LoadLocalComponents(cfg.ComponentsPath) - configLoader := configuration_loader.LoadStandaloneConfiguration + configLoader := configuration_loader.LoadStandaloneConfiguration(cfg.Config) if cfg.Mode == modes.KubernetesMode { - compLoader = operator.GetComponentsWatcher(cfg.ControlPlaneAddress) - configLoader = operator.GetConfigurationWatcher(cfg.ControlPlaneAddress) - } - - configChan, err := configLoader(mainCtx, cfg.Config) - if err != nil { - klog.Fatal(err) + compLoader = operator.GetComponentsWatcher(mainCtx, cfg.ControlPlaneAddress, wg) + configLoader = operator.GetConfigurationWatcher(mainCtx, cfg.ControlPlaneAddress, cfg.Config, wg) } //setup tracing - go tracing.WatchConfig(mainCtx, configChan, tracing.SetJaegerTracing, "dev", cfg.AppID) + go tracing.WatchConfig(mainCtx, configLoader, tracing.SetJaegerTracing, "dev", cfg.AppID) compManager, err := runtime.NewComponentsManager(mainCtx, cfg.AppID, compLoader, RegisterComponentFactories()...) @@ -54,6 +51,7 @@ func main() { klog.Error(err) return } + api := grpc_api.NewGrpcAPI(cfg.RusiGRPCPort) rt, err := runtime.NewRuntime(mainCtx, cfg, api, configLoader, compManager) if err != nil { @@ -65,8 +63,8 @@ func main() { "app id", cfg.AppID, "mode", cfg.Mode) klog.InfoS("Rusid is using", "config", cfg) - //Healthz server - go startHealthzServer(mainCtx, cfg.HealthzPort, + //Start healthz server + go startHealthzServer(mainCtx, wg, cfg.HealthzPort, // WithTimeout allows you to set a max overall timeout. healthcheck.WithTimeout(5*time.Second), healthcheck.WithChecker("component manager", compManager)) @@ -77,6 +75,8 @@ func main() { if err != nil { klog.Error(err) } + + wg.Wait() // wait for app to close gracefully } func shutdownOnInterrupt(cancel func()) { @@ -91,7 +91,10 @@ func shutdownOnInterrupt(cancel func()) { } -func startHealthzServer(ctx context.Context, healthzPort int, options ...healthcheck.Option) { +func startHealthzServer(ctx context.Context, wg *sync.WaitGroup, healthzPort int, options ...healthcheck.Option) { + wg.Add(1) + defer wg.Done() + if err := healthcheck.Run(ctx, healthzPort, options...); err != nil { if err != http.ErrServerClosed { klog.ErrorS(err, "failed to start healthz server") diff --git a/internal/tracing/tracing.go b/internal/tracing/tracing.go index dd41a09..16a07fc 100644 --- a/internal/tracing/tracing.go +++ b/internal/tracing/tracing.go @@ -7,11 +7,11 @@ import ( tracesdk "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "k8s.io/klog/v2" - "rusi/pkg/custom-resource/configuration" + configuration_loader "rusi/pkg/custom-resource/configuration/loader" "time" ) -func WatchConfig(mainCtx context.Context, configChan <-chan configuration.Spec, +func WatchConfig(ctx context.Context, configLoader configuration_loader.ConfigurationLoader, tracerFunc func(url, environment, serviceName string) (*tracesdk.TracerProvider, error), environment, serviceName string) { @@ -20,13 +20,19 @@ func WatchConfig(mainCtx context.Context, configChan <-chan configuration.Spec, prevEndpointAddresss string tp *tracesdk.TracerProvider ) + + configChan, err := configLoader(ctx) + if err != nil { + klog.ErrorS(err, "error loading application config") + } + for cfg := range configChan { if prevEndpointAddresss == cfg.TracingSpec.Zipkin.EndpointAddresss { continue } if tp != nil { //flush prev logs - FlushTracer(tp)(mainCtx) + FlushTracer(tp)(ctx) } if cfg.TracingSpec.Zipkin.EndpointAddresss != "" { tp, err = tracerFunc(cfg.TracingSpec.Zipkin.EndpointAddresss, environment, serviceName) @@ -38,7 +44,7 @@ func WatchConfig(mainCtx context.Context, configChan <-chan configuration.Spec, } if tp != nil { //flush prev logs - FlushTracer(tp)(mainCtx) + FlushTracer(tp)(ctx) } } @@ -57,7 +63,7 @@ func FlushTracer(tp *tracesdk.TracerProvider) func(ctx context.Context) { ctx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() if err := tp.Shutdown(ctx); err != nil { - klog.Fatal(err) + klog.ErrorS(err, "Tracer shutdown error") } } } diff --git a/pkg/api/runtime/grpc/grpc_api.go b/pkg/api/runtime/grpc/grpc_api.go index e0e20ba..b7f7494 100644 --- a/pkg/api/runtime/grpc/grpc_api.go +++ b/pkg/api/runtime/grpc/grpc_api.go @@ -56,7 +56,7 @@ func (srv *grpcApi) Serve(ctx context.Context) error { go func() { select { case <-ctx.Done(): - grpcServer.Stop() + grpcServer.GracefulStop() } }() diff --git a/pkg/custom-resource/configuration/loader/loader.go b/pkg/custom-resource/configuration/loader/loader.go index 829fbd8..21fdde1 100644 --- a/pkg/custom-resource/configuration/loader/loader.go +++ b/pkg/custom-resource/configuration/loader/loader.go @@ -5,4 +5,4 @@ import ( "rusi/pkg/custom-resource/configuration" ) -type ConfigurationLoader func(ctx context.Context, name string) (<-chan configuration.Spec, error) +type ConfigurationLoader func(ctx context.Context) (<-chan configuration.Spec, error) diff --git a/pkg/custom-resource/configuration/loader/local.go b/pkg/custom-resource/configuration/loader/local.go index 5db86d7..70b4f7f 100644 --- a/pkg/custom-resource/configuration/loader/local.go +++ b/pkg/custom-resource/configuration/loader/local.go @@ -25,29 +25,31 @@ func LoadDefaultConfiguration() configuration.Spec { } // LoadStandaloneConfiguration gets the path to a config file and loads it into a configuration. -func LoadStandaloneConfiguration(ctx context.Context, config string) (<-chan configuration.Spec, error) { - spec := LoadDefaultConfiguration() - c := make(chan configuration.Spec) +func LoadStandaloneConfiguration(config string) func(ctx context.Context) (<-chan configuration.Spec, error) { + return func(ctx context.Context) (<-chan configuration.Spec, error) { + spec := LoadDefaultConfiguration() + c := make(chan configuration.Spec) - _, err := os.Stat(config) - if err != nil { - return c, err - } + _, err := os.Stat(config) + if err != nil { + return c, err + } - b, err := ioutil.ReadFile(config) - if err != nil { - return c, err - } + b, err := ioutil.ReadFile(config) + if err != nil { + return c, err + } - // Parse environment variables from yaml - b = []byte(os.ExpandEnv(string(b))) + // Parse environment variables from yaml + b = []byte(os.ExpandEnv(string(b))) - cfg := configuration.Configuration{Spec: spec} - err = yaml.Unmarshal(b, &cfg) + cfg := configuration.Configuration{Spec: spec} + err = yaml.Unmarshal(b, &cfg) - go func() { - c <- cfg.Spec - }() + go func() { + c <- cfg.Spec + }() - return c, err + return c, err + } } diff --git a/pkg/custom-resource/configuration/loader/local_test.go b/pkg/custom-resource/configuration/loader/local_test.go index 3c69d8b..77daa23 100644 --- a/pkg/custom-resource/configuration/loader/local_test.go +++ b/pkg/custom-resource/configuration/loader/local_test.go @@ -56,7 +56,7 @@ spec: for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - _, err := LoadStandaloneConfiguration(ctx, tc.path) + _, err := LoadStandaloneConfiguration(tc.path)(ctx) if tc.errorExpected { assert.Error(t, err, "Expected an error") } else { @@ -67,7 +67,7 @@ spec: t.Run("Parse environment variables", func(t *testing.T) { os.Setenv("RUSI_ZIPKIN_ENDPOINT", "http://localhost:42323") - configChan, err := LoadStandaloneConfiguration(ctx, "env_variables_config.yaml") + configChan, err := LoadStandaloneConfiguration("env_variables_config.yaml")(ctx) config := <-configChan assert.NoError(t, err, "Unexpected error") assert.NotNil(t, config, "Config not loaded as expected") @@ -75,7 +75,7 @@ spec: }) t.Run("Load config file", func(t *testing.T) { - configChan, err := LoadStandaloneConfiguration(ctx, "config.yaml") + configChan, err := LoadStandaloneConfiguration("config.yaml")(ctx) config := <-configChan assert.NoError(t, err, "Unexpected error") assert.NotNil(t, config, "Config not loaded as expected") @@ -131,7 +131,7 @@ spec: ctx := context.Background() for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - configChan, err := LoadStandaloneConfiguration(ctx, tc.confFile) + configChan, err := LoadStandaloneConfiguration(tc.confFile)(ctx) config := <-configChan assert.NoError(t, err) assert.Equal(t, tc.featureEnabled, configuration.IsFeatureEnabled(config.Features, tc.featureName)) diff --git a/pkg/operator/client.go b/pkg/operator/client.go index 3f899e1..106e1d4 100644 --- a/pkg/operator/client.go +++ b/pkg/operator/client.go @@ -10,6 +10,7 @@ import ( "rusi/pkg/custom-resource/configuration" "rusi/pkg/kube" operatorv1 "rusi/pkg/proto/operator/v1" + "sync" ) var conn *grpc.ClientConn @@ -37,13 +38,10 @@ func newClient(ctx context.Context, address string) (cl operatorv1.RusiOperatorC return operatorv1.NewRusiOperatorClient(conn), nil } -func GetComponentsWatcher(address string) func(context.Context) (<-chan components.Spec, error) { +func GetComponentsWatcher(ctx context.Context, address string, wg *sync.WaitGroup) func(context.Context) (<-chan components.Spec, error) { + client, _ := newClient(ctx, address) return func(ctx context.Context) (<-chan components.Spec, error) { c := make(chan components.Spec) - client, err := newClient(ctx, address) - if err != nil { - return nil, err - } namespace := kube.GetCurrentNamespace() req := &operatorv1.WatchComponentsRequest{Namespace: namespace} stream, err := client.WatchComponents(ctx, req) @@ -51,9 +49,13 @@ func GetComponentsWatcher(address string) func(context.Context) (<-chan componen return nil, err } go func() { + wg.Add(1) + defer wg.Done() + defer close(c) for { select { case <-ctx.Done(): + klog.ErrorS(ctx.Err(), "watch components shutting down") return default: for { @@ -78,24 +80,24 @@ func GetComponentsWatcher(address string) func(context.Context) (<-chan componen } } -func GetConfigurationWatcher(address string) func(context.Context, string) (<-chan configuration.Spec, error) { - return func(ctx context.Context, name string) (<-chan configuration.Spec, error) { +func GetConfigurationWatcher(ctx context.Context, address, configName string, wg *sync.WaitGroup) func(context.Context) (<-chan configuration.Spec, error) { + client, _ := newClient(ctx, address) + return func(ctx context.Context) (<-chan configuration.Spec, error) { c := make(chan configuration.Spec) - client, err := newClient(ctx, address) - if err != nil { - return nil, err - } namespace := kube.GetCurrentNamespace() - req := &operatorv1.WatchConfigurationRequest{ConfigName: name, Namespace: namespace} + req := &operatorv1.WatchConfigurationRequest{ConfigName: configName, Namespace: namespace} stream, err := client.WatchConfiguration(ctx, req) - if err != nil { return nil, err } go func() { + wg.Add(1) + defer wg.Done() + defer close(c) for { select { case <-ctx.Done(): + klog.ErrorS(ctx.Err(), "watch configuration shutting down") return default: for { diff --git a/pkg/runtime/components_manager.go b/pkg/runtime/components_manager.go index f673753..738172c 100644 --- a/pkg/runtime/components_manager.go +++ b/pkg/runtime/components_manager.go @@ -70,6 +70,7 @@ func (m *ComponentsManager) watchComponentsUpdates() { klog.ErrorS(err, "Error loading ", "component", update.Name) } } + close(m.changeNotificationChan) } func (m *ComponentsManager) addOrUpdateComponent(spec components.Spec) (err error) { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 7b8ea28..1608640 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -32,7 +32,7 @@ func NewRuntime(ctx context.Context, config Config, api runtime_api.Api, configurationLoader configuration_loader.ConfigurationLoader, manager *ComponentsManager) (*runtime, error) { - configChan, err := configurationLoader(ctx, config.Config) + configChan, err := configurationLoader(ctx) if err != nil { klog.ErrorS(err, "error loading application config", "name", config.Config, "mode", config.Mode) @@ -62,10 +62,11 @@ func NewRuntime(ctx context.Context, config Config, api runtime_api.Api, func (rt *runtime) watchConfigurationUpdates() { for update := range rt.configurationUpdatesChan { if reflect.DeepEqual(rt.appConfig, update) { - return + klog.V(4).InfoS("configuration not changed") + continue } klog.InfoS("configuration changed") - klog.V(4).InfoS("configuration changed", "old config", rt.appConfig, "new config", update) + klog.V(4).InfoS("configuration details", "old", rt.appConfig, "new", update) rt.appConfig = update err := rt.api.Refresh() if err != nil { diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 28e8503..5a001a0 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -36,8 +36,8 @@ func Test_runtime_PublishHandler(t *testing.T) { return request } - configLoader := func(channel chan configuration.Spec, err error, streamer func(channel chan configuration.Spec)) func(ctx context.Context, name string) (<-chan configuration.Spec, error) { - return func(ctx context.Context, name string) (<-chan configuration.Spec, error) { + configLoader := func(channel chan configuration.Spec, err error, streamer func(channel chan configuration.Spec)) func(ctx context.Context) (<-chan configuration.Spec, error) { + return func(ctx context.Context) (<-chan configuration.Spec, error) { go streamer(channel) return channel, err }