Skip to content

Commit

Permalink
more operator logs
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Oct 14, 2021
1 parent aa537eb commit eb17c2e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 62 deletions.
6 changes: 4 additions & 2 deletions pkg/operator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func GetComponentsWatcher(address string) func(context.Context) (<-chan componen
msg, err := stream.Recv()
if err == io.EOF {
//done <- true
klog.V(4).Info("watch components grpc stream EOF")
klog.Warning("watch components grpc stream EOF")
break
}
if err != nil {
Expand All @@ -65,6 +65,7 @@ func GetComponentsWatcher(address string) func(context.Context) (<-chan componen
}
c <- spec
}
klog.Warning("watch components grpc stream closed")
close(c)
conn.Close()
}()
Expand All @@ -91,7 +92,7 @@ func GetConfigurationWatcher(address string) func(context.Context, string) (<-ch
msg, err := stream.Recv()
if err == io.EOF {
//done <- true
klog.V(4).Info("watch configuration grpc stream EOF")
klog.Warning("watch configuration grpc stream EOF")
break
}
if err != nil {
Expand All @@ -105,6 +106,7 @@ func GetConfigurationWatcher(address string) func(context.Context, string) (<-ch
}
c <- spec
}
klog.Warning("watch configuration grpc stream closed")
close(c)
conn.Close()
}()
Expand Down
98 changes: 38 additions & 60 deletions pkg/operator/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package operator

import (
"context"
"encoding/json"
"errors"
"github.com/google/uuid"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -20,88 +20,66 @@ type operatorServer struct {
}

func (opsrv *operatorServer) WatchConfiguration(request *operatorv1.WatchConfigurationRequest, stream operatorv1.RusiOperator_WatchConfigurationServer) error {
c, err := listConfiguration(stream.Context(), opsrv.client, request.ConfigName, request.Namespace)
watcher, err := opsrv.client.ConfigurationV1alpha1().Configurations(request.Namespace).Watch(stream.Context(), v1.ListOptions{})
if err != nil {
return err
}
for {
select {
case data := <-c:
b, _ := json.Marshal(data)
stream.Send(&operatorv1.GenericItem{
Data: b,
})
case event, ok := <-watcher.ResultChan():
if !ok {
return errors.New("kubernetes configuration watcher closed")
}
klog.V(4).InfoS("received configuration", "event", event.Type, "object", event.Object)
if event.Type != watch.Error {
obj := event.Object.(*configv1.Configuration)
if obj.Name == request.ConfigName {
b, _ := json.Marshal(obj.Spec)
stream.Send(&operatorv1.GenericItem{
Data: b,
})
}
}
case <-stream.Context().Done():
klog.V(4).ErrorS(stream.Context().Err(), "grpc WatchConfiguration stream closed")
return nil
}
}
}

func (opsrv *operatorServer) WatchComponents(request *operatorv1.WatchComponentsRequest, stream operatorv1.RusiOperator_WatchComponentsServer) error {
c, err := listComponents(stream.Context(), opsrv.client, request.Namespace)
watcher, err := opsrv.client.ComponentsV1alpha1().Components(request.Namespace).Watch(stream.Context(), v1.ListOptions{})
if err != nil {
return err
}
for {
select {
case data := <-c:
b, _ := json.Marshal(data)
stream.Send(&operatorv1.GenericItem{
Data: b,
})
case event, ok := <-watcher.ResultChan():
if !ok {
return errors.New("kubernetes components watcher closed")
}
klog.V(4).InfoS("received component", "event", event.Type, "object", event.Object)
if event.Type != watch.Error {
b, _ := json.Marshal(convertToComponent(event))
stream.Send(&operatorv1.GenericItem{
Data: b,
})
}
case <-stream.Context().Done():
klog.V(4).ErrorS(stream.Context().Err(), "grpc WatchComponents stream closed")
return nil
}
}
}

func listComponents(ctx context.Context, client *versioned.Clientset, namespace string) (<-chan components.Spec, error) {
c := make(chan components.Spec)
watcher, err := client.ComponentsV1alpha1().Components(namespace).Watch(ctx, v1.ListOptions{})
if err != nil {
return c, err
}
go func() {
for conf := range watcher.ResultChan() {
klog.V(4).InfoS("received component", "event", conf.Type, "object", conf.Object)
if conf.Type == watch.Error {
continue
}
item := conf.Object.(*compv1.Component)
c <- components.Spec{
Name: item.Name,
Type: item.Spec.Type,
Version: item.Spec.Version,
Metadata: convertMetadataItemsToProperties(item.Spec.Metadata),
Scopes: item.Scopes,
}
}
}()
return c, nil
}

func listConfiguration(ctx context.Context, client *versioned.Clientset, name string, namespace string) (<-chan configv1.ConfigurationSpec, error) {
c := make(chan configv1.ConfigurationSpec)
watcher, err := client.ConfigurationV1alpha1().Configurations(namespace).Watch(ctx, v1.ListOptions{})
if err != nil {
return c, err
func convertToComponent(event watch.Event) components.Spec {
item := event.Object.(*compv1.Component)
return components.Spec{
Name: item.Name,
Type: item.Spec.Type,
Version: item.Spec.Version,
Metadata: convertMetadataItemsToProperties(item.Spec.Metadata),
Scopes: item.Scopes,
}

go func() {
for conf := range watcher.ResultChan() {
klog.V(4).InfoS("received configuration", "event", conf.Type, "object", conf.Object)
if conf.Type == watch.Error {
continue
}
obj := conf.Object.(*configv1.Configuration)
if obj.Name != name {
continue
}
c <- obj.Spec
}
}()

return c, err
}

func convertMetadataItemsToProperties(items []compv1.MetadataItem) map[string]string {
Expand Down

0 comments on commit eb17c2e

Please sign in to comment.