Skip to content

Commit

Permalink
refactor operator
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Oct 14, 2021
1 parent eb17c2e commit 105dc94
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 40 deletions.
6 changes: 3 additions & 3 deletions pkg/operator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package operator

import (
"context"
"encoding/json"
jsoniter "github.com/json-iterator/go"
"google.golang.org/grpc"
"io"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -59,7 +59,7 @@ func GetComponentsWatcher(address string) func(context.Context) (<-chan componen
break
}
spec := components.Spec{}
err = json.Unmarshal(msg.Data, &spec)
err = jsoniter.Unmarshal(msg.Data, &spec)
if err != nil {
klog.ErrorS(err, "unable to Unmarshal operator data ")
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func GetConfigurationWatcher(address string) func(context.Context, string) (<-ch
break
}
spec := configuration.Spec{}
err = json.Unmarshal(msg.Data, &spec)
err = jsoniter.Unmarshal(msg.Data, &spec)
if err != nil {
klog.ErrorS(err, "unable to Unmarshal operator data ")
}
Expand Down
147 changes: 146 additions & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,42 @@
package operator

import (
"context"
"fmt"
"google.golang.org/grpc"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
"net"
"rusi/pkg/kube"
compv1 "rusi/pkg/operator/apis/components/v1alpha1"
configv1 "rusi/pkg/operator/apis/configuration/v1alpha1"
"rusi/pkg/operator/client/clientset/versioned"
operatorv1 "rusi/pkg/proto/operator/v1"
"sync"
)

const serverPort = 6500

type objectWatcher struct {
componentsMap map[string]compv1.Component
configurationsMap map[string]configv1.Configuration
mu sync.RWMutex
compChans []chan compv1.Component
configChans []chan configv1.Configuration
}

func Run() {
s := grpc.NewServer()

cfg := kube.GetConfig()
client, _ := versioned.NewForConfig(cfg)
ow := newObjectWatcher()

go ow.startWatchingForConfigurations(context.Background(), client)
go ow.startWatchingForComponents(context.Background(), client)

operatorv1.RegisterRusiOperatorServer(s, &operatorServer{client})
operatorv1.RegisterRusiOperatorServer(s, &operatorServer{ow})

lis, err := net.Listen("tcp", fmt.Sprintf(":%v", serverPort))
if err != nil {
Expand All @@ -29,3 +47,130 @@ func Run() {
klog.Fatalf("gRPC server error: %v", err)
}
}

func newObjectWatcher() *objectWatcher {
return &objectWatcher{
componentsMap: map[string]compv1.Component{},
configurationsMap: map[string]configv1.Configuration{},
mu: sync.RWMutex{},
compChans: []chan compv1.Component{},
configChans: []chan configv1.Configuration{},
}
}

func (ow *objectWatcher) startWatchingForComponents(ctx context.Context, client *versioned.Clientset) {
watcher, err := client.ComponentsV1alpha1().Components("").Watch(ctx, v1.ListOptions{})
if err != nil {
klog.ErrorS(err, "error creating components watcher")
return
}
for event := range watcher.ResultChan() {
klog.V(4).InfoS("received component", "event", event.Type, "object", event.Object)
if event.Type != watch.Error {
comp := event.Object.(*compv1.Component)
ow.mu.Lock()
ow.componentsMap[string(comp.UID)] = *comp
ow.mu.Unlock()
ow.componentChange(*comp)
}
}
}

func (ow *objectWatcher) componentChange(comp compv1.Component) {
ow.mu.RLock()
defer ow.mu.RUnlock()
klog.V(4).InfoS("publishing component change", "subscribers", len(ow.compChans))

for _, c := range ow.compChans {
c := c
go func() {
c <- comp
}()
}
}

func (ow *objectWatcher) startWatchingForConfigurations(ctx context.Context, client *versioned.Clientset) {
watcher, err := client.ConfigurationV1alpha1().Configurations("").Watch(ctx, v1.ListOptions{})
if err != nil {
klog.ErrorS(err, "error creating configurations watcher")
return
}
for event := range watcher.ResultChan() {
klog.V(4).InfoS("received configuration", "event", event.Type, "object", event.Object)
if event.Type != watch.Error {
comp := event.Object.(*configv1.Configuration)
ow.mu.Lock()
ow.configurationsMap[string(comp.UID)] = *comp
ow.mu.Unlock()
ow.configurationChange(*comp)
}
}
}

func (ow *objectWatcher) configurationChange(comp configv1.Configuration) {
ow.mu.RLock()
defer ow.mu.RUnlock()
klog.V(4).InfoS("publishing configuration change", "subscribers", len(ow.configChans))

for _, c := range ow.configChans {
c := c
go func() {
c <- comp
}()
}
}

func (ow *objectWatcher) addComponentListener(c chan compv1.Component) {
ow.mu.Lock()
ow.compChans = append(ow.compChans, c)
ow.mu.Unlock()
go ow.replayComponents(c)
}
func (ow *objectWatcher) addConfigurationListener(c chan configv1.Configuration) {
ow.mu.Lock()
ow.configChans = append(ow.configChans, c)
ow.mu.Unlock()
go ow.replayConfigs(c)
}

func (ow *objectWatcher) replayConfigs(c chan configv1.Configuration) {
ow.mu.RLock()
defer ow.mu.RUnlock()
for _, item := range ow.configurationsMap {
c <- item
}
}

func (ow *objectWatcher) replayComponents(c chan compv1.Component) {
ow.mu.RLock()
defer ow.mu.RUnlock()
for _, item := range ow.componentsMap {
c <- item
}
}

func (ow *objectWatcher) removeComponentListener(c chan compv1.Component) {
ow.mu.Lock()
defer ow.mu.Unlock()

var list []chan compv1.Component
for _, compChan := range ow.compChans {
if compChan != c {
list = append(list, compChan)
}
}
ow.compChans = list
}

func (ow *objectWatcher) removeConfigurationListener(c chan configv1.Configuration) {
ow.mu.Lock()
defer ow.mu.Unlock()

var list []chan configv1.Configuration
for _, compChan := range ow.configChans {
if compChan != c {
list = append(list, compChan)
}
}
ow.configChans = list
}
57 changes: 21 additions & 36 deletions pkg/operator/server.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,33 @@
package operator

import (
"encoding/json"
"errors"
"github.com/google/uuid"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
jsoniter "github.com/json-iterator/go"
"k8s.io/klog/v2"
"rusi/pkg/custom-resource/components"
compv1 "rusi/pkg/operator/apis/components/v1alpha1"
configv1 "rusi/pkg/operator/apis/configuration/v1alpha1"
"rusi/pkg/operator/client/clientset/versioned"
operatorv1 "rusi/pkg/proto/operator/v1"
"strings"
)

type operatorServer struct {
client *versioned.Clientset
ow *objectWatcher
}

func (opsrv *operatorServer) WatchConfiguration(request *operatorv1.WatchConfigurationRequest, stream operatorv1.RusiOperator_WatchConfigurationServer) error {
watcher, err := opsrv.client.ConfigurationV1alpha1().Configurations(request.Namespace).Watch(stream.Context(), v1.ListOptions{})
if err != nil {
return err
}
c := make(chan configv1.Configuration)
opsrv.ow.addConfigurationListener(c)
defer opsrv.ow.removeConfigurationListener(c)

for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return errors.New("kubernetes configuration watcher closed")
}
klog.V(4).InfoS("received configuration", "event", event.Type, "object", event.Object)
if event.Type != watch.Error {
obj := event.Object.(*configv1.Configuration)
if obj.Name == request.ConfigName {
b, _ := json.Marshal(obj.Spec)
stream.Send(&operatorv1.GenericItem{
Data: b,
})
}
case obj := <-c:
if obj.Namespace == request.Namespace && obj.Name == request.ConfigName {
b, _ := jsoniter.Marshal(obj.Spec)
stream.Send(&operatorv1.GenericItem{
Data: b,
})
}
case <-stream.Context().Done():
klog.V(4).ErrorS(stream.Context().Err(), "grpc WatchConfiguration stream closed")
Expand All @@ -48,19 +37,15 @@ func (opsrv *operatorServer) WatchConfiguration(request *operatorv1.WatchConfigu
}

func (opsrv *operatorServer) WatchComponents(request *operatorv1.WatchComponentsRequest, stream operatorv1.RusiOperator_WatchComponentsServer) error {
watcher, err := opsrv.client.ComponentsV1alpha1().Components(request.Namespace).Watch(stream.Context(), v1.ListOptions{})
if err != nil {
return err
}
c := make(chan compv1.Component)
opsrv.ow.addComponentListener(c)
defer opsrv.ow.removeComponentListener(c)

for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return errors.New("kubernetes components watcher closed")
}
klog.V(4).InfoS("received component", "event", event.Type, "object", event.Object)
if event.Type != watch.Error {
b, _ := json.Marshal(convertToComponent(event))
case obj := <-c:
if obj.Namespace == request.Namespace {
b, _ := jsoniter.Marshal(convertToComponent(obj))
stream.Send(&operatorv1.GenericItem{
Data: b,
})
Expand All @@ -71,8 +56,8 @@ func (opsrv *operatorServer) WatchComponents(request *operatorv1.WatchComponents
}
}
}
func convertToComponent(event watch.Event) components.Spec {
item := event.Object.(*compv1.Component)

func convertToComponent(item compv1.Component) components.Spec {
return components.Spec{
Name: item.Name,
Type: item.Spec.Type,
Expand Down

0 comments on commit 105dc94

Please sign in to comment.