Skip to content

Commit

Permalink
fix operator disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Oct 18, 2021
1 parent ce0628e commit 6c8db44
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 60 deletions.
27 changes: 15 additions & 12 deletions cmd/rusid/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
"rusi/pkg/modes"
"rusi/pkg/operator"
"rusi/pkg/runtime"
"sync"
"time"
)

func main() {
mainCtx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

//https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md
klog.InitFlags(nil)
Expand All @@ -34,26 +36,22 @@ func main() {
return
}
compLoader := components_loader.LoadLocalComponents(cfg.ComponentsPath)
configLoader := configuration_loader.LoadStandaloneConfiguration
configLoader := configuration_loader.LoadStandaloneConfiguration(cfg.Config)
if cfg.Mode == modes.KubernetesMode {
compLoader = operator.GetComponentsWatcher(cfg.ControlPlaneAddress)
configLoader = operator.GetConfigurationWatcher(cfg.ControlPlaneAddress)
}

configChan, err := configLoader(mainCtx, cfg.Config)
if err != nil {
klog.Fatal(err)
compLoader = operator.GetComponentsWatcher(mainCtx, cfg.ControlPlaneAddress, wg)
configLoader = operator.GetConfigurationWatcher(mainCtx, cfg.ControlPlaneAddress, cfg.Config, wg)
}

//setup tracing
go tracing.WatchConfig(mainCtx, configChan, tracing.SetJaegerTracing, "dev", cfg.AppID)
go tracing.WatchConfig(mainCtx, configLoader, tracing.SetJaegerTracing, "dev", cfg.AppID)

compManager, err := runtime.NewComponentsManager(mainCtx, cfg.AppID, compLoader,
RegisterComponentFactories()...)
if err != nil {
klog.Error(err)
return
}

api := grpc_api.NewGrpcAPI(cfg.RusiGRPCPort)
rt, err := runtime.NewRuntime(mainCtx, cfg, api, configLoader, compManager)
if err != nil {
Expand All @@ -65,8 +63,8 @@ func main() {
"app id", cfg.AppID, "mode", cfg.Mode)
klog.InfoS("Rusid is using", "config", cfg)

//Healthz server
go startHealthzServer(mainCtx, cfg.HealthzPort,
//Start healthz server
go startHealthzServer(mainCtx, wg, cfg.HealthzPort,
// WithTimeout allows you to set a max overall timeout.
healthcheck.WithTimeout(5*time.Second),
healthcheck.WithChecker("component manager", compManager))
Expand All @@ -77,6 +75,8 @@ func main() {
if err != nil {
klog.Error(err)
}

wg.Wait() // wait for app to close gracefully
}

func shutdownOnInterrupt(cancel func()) {
Expand All @@ -91,7 +91,10 @@ func shutdownOnInterrupt(cancel func()) {

}

func startHealthzServer(ctx context.Context, healthzPort int, options ...healthcheck.Option) {
func startHealthzServer(ctx context.Context, wg *sync.WaitGroup, healthzPort int, options ...healthcheck.Option) {
wg.Add(1)
defer wg.Done()

if err := healthcheck.Run(ctx, healthzPort, options...); err != nil {
if err != http.ErrServerClosed {
klog.ErrorS(err, "failed to start healthz server")
Expand Down
16 changes: 11 additions & 5 deletions internal/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"k8s.io/klog/v2"
"rusi/pkg/custom-resource/configuration"
configuration_loader "rusi/pkg/custom-resource/configuration/loader"
"time"
)

func WatchConfig(mainCtx context.Context, configChan <-chan configuration.Spec,
func WatchConfig(ctx context.Context, configLoader configuration_loader.ConfigurationLoader,
tracerFunc func(url, environment, serviceName string) (*tracesdk.TracerProvider, error),
environment, serviceName string) {

Expand All @@ -20,13 +20,19 @@ func WatchConfig(mainCtx context.Context, configChan <-chan configuration.Spec,
prevEndpointAddresss string
tp *tracesdk.TracerProvider
)

configChan, err := configLoader(ctx)
if err != nil {
klog.ErrorS(err, "error loading application config")
}

for cfg := range configChan {
if prevEndpointAddresss == cfg.TracingSpec.Zipkin.EndpointAddresss {
continue
}
if tp != nil {
//flush prev logs
FlushTracer(tp)(mainCtx)
FlushTracer(tp)(ctx)
}
if cfg.TracingSpec.Zipkin.EndpointAddresss != "" {
tp, err = tracerFunc(cfg.TracingSpec.Zipkin.EndpointAddresss, environment, serviceName)
Expand All @@ -38,7 +44,7 @@ func WatchConfig(mainCtx context.Context, configChan <-chan configuration.Spec,
}
if tp != nil {
//flush prev logs
FlushTracer(tp)(mainCtx)
FlushTracer(tp)(ctx)
}
}

Expand All @@ -57,7 +63,7 @@ func FlushTracer(tp *tracesdk.TracerProvider) func(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Tracer shutdown error")
}
}
}
2 changes: 1 addition & 1 deletion pkg/api/runtime/grpc/grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (srv *grpcApi) Serve(ctx context.Context) error {
go func() {
select {
case <-ctx.Done():
grpcServer.Stop()
grpcServer.GracefulStop()
}
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/custom-resource/configuration/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ import (
"rusi/pkg/custom-resource/configuration"
)

type ConfigurationLoader func(ctx context.Context, name string) (<-chan configuration.Spec, error)
type ConfigurationLoader func(ctx context.Context) (<-chan configuration.Spec, error)
40 changes: 21 additions & 19 deletions pkg/custom-resource/configuration/loader/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,31 @@ func LoadDefaultConfiguration() configuration.Spec {
}

// LoadStandaloneConfiguration gets the path to a config file and loads it into a configuration.
func LoadStandaloneConfiguration(ctx context.Context, config string) (<-chan configuration.Spec, error) {
spec := LoadDefaultConfiguration()
c := make(chan configuration.Spec)
func LoadStandaloneConfiguration(config string) func(ctx context.Context) (<-chan configuration.Spec, error) {
return func(ctx context.Context) (<-chan configuration.Spec, error) {
spec := LoadDefaultConfiguration()
c := make(chan configuration.Spec)

_, err := os.Stat(config)
if err != nil {
return c, err
}
_, err := os.Stat(config)
if err != nil {
return c, err
}

b, err := ioutil.ReadFile(config)
if err != nil {
return c, err
}
b, err := ioutil.ReadFile(config)
if err != nil {
return c, err
}

// Parse environment variables from yaml
b = []byte(os.ExpandEnv(string(b)))
// Parse environment variables from yaml
b = []byte(os.ExpandEnv(string(b)))

cfg := configuration.Configuration{Spec: spec}
err = yaml.Unmarshal(b, &cfg)
cfg := configuration.Configuration{Spec: spec}
err = yaml.Unmarshal(b, &cfg)

go func() {
c <- cfg.Spec
}()
go func() {
c <- cfg.Spec
}()

return c, err
return c, err
}
}
8 changes: 4 additions & 4 deletions pkg/custom-resource/configuration/loader/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ spec:

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, err := LoadStandaloneConfiguration(ctx, tc.path)
_, err := LoadStandaloneConfiguration(tc.path)(ctx)
if tc.errorExpected {
assert.Error(t, err, "Expected an error")
} else {
Expand All @@ -67,15 +67,15 @@ spec:

t.Run("Parse environment variables", func(t *testing.T) {
os.Setenv("RUSI_ZIPKIN_ENDPOINT", "http://localhost:42323")
configChan, err := LoadStandaloneConfiguration(ctx, "env_variables_config.yaml")
configChan, err := LoadStandaloneConfiguration("env_variables_config.yaml")(ctx)
config := <-configChan
assert.NoError(t, err, "Unexpected error")
assert.NotNil(t, config, "Config not loaded as expected")
assert.Equal(t, "http://localhost:42323", config.TracingSpec.Zipkin.EndpointAddresss)
})

t.Run("Load config file", func(t *testing.T) {
configChan, err := LoadStandaloneConfiguration(ctx, "config.yaml")
configChan, err := LoadStandaloneConfiguration("config.yaml")(ctx)
config := <-configChan
assert.NoError(t, err, "Unexpected error")
assert.NotNil(t, config, "Config not loaded as expected")
Expand Down Expand Up @@ -131,7 +131,7 @@ spec:
ctx := context.Background()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
configChan, err := LoadStandaloneConfiguration(ctx, tc.confFile)
configChan, err := LoadStandaloneConfiguration(tc.confFile)(ctx)
config := <-configChan
assert.NoError(t, err)
assert.Equal(t, tc.featureEnabled, configuration.IsFeatureEnabled(config.Features, tc.featureName))
Expand Down
28 changes: 15 additions & 13 deletions pkg/operator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"rusi/pkg/custom-resource/configuration"
"rusi/pkg/kube"
operatorv1 "rusi/pkg/proto/operator/v1"
"sync"
)

var conn *grpc.ClientConn
Expand Down Expand Up @@ -37,23 +38,24 @@ func newClient(ctx context.Context, address string) (cl operatorv1.RusiOperatorC
return operatorv1.NewRusiOperatorClient(conn), nil
}

func GetComponentsWatcher(address string) func(context.Context) (<-chan components.Spec, error) {
func GetComponentsWatcher(ctx context.Context, address string, wg *sync.WaitGroup) func(context.Context) (<-chan components.Spec, error) {
client, _ := newClient(ctx, address)
return func(ctx context.Context) (<-chan components.Spec, error) {
c := make(chan components.Spec)
client, err := newClient(ctx, address)
if err != nil {
return nil, err
}
namespace := kube.GetCurrentNamespace()
req := &operatorv1.WatchComponentsRequest{Namespace: namespace}
stream, err := client.WatchComponents(ctx, req)
if err != nil {
return nil, err
}
go func() {
wg.Add(1)
defer wg.Done()
defer close(c)
for {
select {
case <-ctx.Done():
klog.ErrorS(ctx.Err(), "watch components shutting down")
return
default:
for {
Expand All @@ -78,24 +80,24 @@ func GetComponentsWatcher(address string) func(context.Context) (<-chan componen
}
}

func GetConfigurationWatcher(address string) func(context.Context, string) (<-chan configuration.Spec, error) {
return func(ctx context.Context, name string) (<-chan configuration.Spec, error) {
func GetConfigurationWatcher(ctx context.Context, address, configName string, wg *sync.WaitGroup) func(context.Context) (<-chan configuration.Spec, error) {
client, _ := newClient(ctx, address)
return func(ctx context.Context) (<-chan configuration.Spec, error) {
c := make(chan configuration.Spec)
client, err := newClient(ctx, address)
if err != nil {
return nil, err
}
namespace := kube.GetCurrentNamespace()
req := &operatorv1.WatchConfigurationRequest{ConfigName: name, Namespace: namespace}
req := &operatorv1.WatchConfigurationRequest{ConfigName: configName, Namespace: namespace}
stream, err := client.WatchConfiguration(ctx, req)

if err != nil {
return nil, err
}
go func() {
wg.Add(1)
defer wg.Done()
defer close(c)
for {
select {
case <-ctx.Done():
klog.ErrorS(ctx.Err(), "watch configuration shutting down")
return
default:
for {
Expand Down
1 change: 1 addition & 0 deletions pkg/runtime/components_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (m *ComponentsManager) watchComponentsUpdates() {
klog.ErrorS(err, "Error loading ", "component", update.Name)
}
}
close(m.changeNotificationChan)
}

func (m *ComponentsManager) addOrUpdateComponent(spec components.Spec) (err error) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewRuntime(ctx context.Context, config Config, api runtime_api.Api,
configurationLoader configuration_loader.ConfigurationLoader,
manager *ComponentsManager) (*runtime, error) {

configChan, err := configurationLoader(ctx, config.Config)
configChan, err := configurationLoader(ctx)
if err != nil {
klog.ErrorS(err, "error loading application config", "name",
config.Config, "mode", config.Mode)
Expand Down Expand Up @@ -62,10 +62,11 @@ func NewRuntime(ctx context.Context, config Config, api runtime_api.Api,
func (rt *runtime) watchConfigurationUpdates() {
for update := range rt.configurationUpdatesChan {
if reflect.DeepEqual(rt.appConfig, update) {
return
klog.V(4).InfoS("configuration not changed")
continue
}
klog.InfoS("configuration changed")
klog.V(4).InfoS("configuration changed", "old config", rt.appConfig, "new config", update)
klog.V(4).InfoS("configuration details", "old", rt.appConfig, "new", update)
rt.appConfig = update
err := rt.api.Refresh()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func Test_runtime_PublishHandler(t *testing.T) {
return request
}

configLoader := func(channel chan configuration.Spec, err error, streamer func(channel chan configuration.Spec)) func(ctx context.Context, name string) (<-chan configuration.Spec, error) {
return func(ctx context.Context, name string) (<-chan configuration.Spec, error) {
configLoader := func(channel chan configuration.Spec, err error, streamer func(channel chan configuration.Spec)) func(ctx context.Context) (<-chan configuration.Spec, error) {
return func(ctx context.Context) (<-chan configuration.Spec, error) {
go streamer(channel)
return channel, err
}
Expand Down

0 comments on commit 6c8db44

Please sign in to comment.