Skip to content

Commit

Permalink
refactor operator
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Oct 14, 2021
1 parent 105dc94 commit e1a9316
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 51 deletions.
24 changes: 13 additions & 11 deletions pkg/messaging/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,19 @@ func (n *natsStreamingPubSub) Subscribe(topic string, handler messaging.Handler,
if msg.Id == "" {
msg.Id = strconv.FormatUint(natsMsg.Sequence, 10)
}
klog.InfoS("Processing NATS Streaming message", "topic", natsMsg.Subject,
"Id", msg.Id)

err = handler(context.Background(), &msg)
if err == nil {
// we only send a successful ACK if there is no error
natsMsg.Ack()
klog.V(4).InfoS("Message manually acknowledged in NATS")
} else {
klog.ErrorS(err, "Error running subscriber pipeline")
}
klog.InfoS("Received message", "topic", natsMsg.Subject, "Id", msg.Id)

//run handler concurrently
go func() {
err = handler(context.Background(), &msg)
if err == nil {
// we only send a successful ACK if there is no error
natsMsg.Ack()
klog.V(4).InfoS("Message manually acknowledged in NATS")
} else {
klog.ErrorS(err, "Error running subscriber pipeline, message was not ACK")
}
}()
}

var subs stan.Subscription
Expand Down
100 changes: 60 additions & 40 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package operator

import (
"context"
"fmt"
"google.golang.org/grpc"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"net"
"rusi/pkg/kube"
compv1 "rusi/pkg/operator/apis/components/v1alpha1"
configv1 "rusi/pkg/operator/apis/configuration/v1alpha1"
"rusi/pkg/operator/client/clientset/versioned"
"rusi/pkg/operator/client/informers/externalversions"
operatorv1 "rusi/pkg/proto/operator/v1"
"sync"
)
Expand All @@ -27,15 +26,18 @@ type objectWatcher struct {
}

func Run() {
s := grpc.NewServer()

cfg := kube.GetConfig()
client, _ := versioned.NewForConfig(cfg)
ow := newObjectWatcher()
factory := externalversions.NewSharedInformerFactory(client, 0)

go ow.startWatchingForConfigurations(context.Background(), client)
go ow.startWatchingForComponents(context.Background(), client)
ow := newObjectWatcher()
ow.startWatchingForConfigurations(factory)
ow.startWatchingForComponents(factory)

stopper := make(chan struct{})
defer close(stopper)
factory.Start(stopper)
s := grpc.NewServer()
operatorv1.RegisterRusiOperatorServer(s, &operatorServer{ow})

lis, err := net.Listen("tcp", fmt.Sprintf(":%v", serverPort))
Expand All @@ -58,22 +60,31 @@ func newObjectWatcher() *objectWatcher {
}
}

func (ow *objectWatcher) startWatchingForComponents(ctx context.Context, client *versioned.Clientset) {
watcher, err := client.ComponentsV1alpha1().Components("").Watch(ctx, v1.ListOptions{})
if err != nil {
klog.ErrorS(err, "error creating components watcher")
return
}
for event := range watcher.ResultChan() {
klog.V(4).InfoS("received component", "event", event.Type, "object", event.Object)
if event.Type != watch.Error {
comp := event.Object.(*compv1.Component)
ow.mu.Lock()
ow.componentsMap[string(comp.UID)] = *comp
ow.mu.Unlock()
ow.componentChange(*comp)
}
}
func (ow *objectWatcher) startWatchingForComponents(factory externalversions.SharedInformerFactory) {
informer := factory.Components().V1alpha1().Components().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
comp := obj.(*compv1.Component)
klog.V(4).InfoS("component added", "name", comp.Name, "namespace", comp.Namespace)
ow.updateComponent(*comp)
},
UpdateFunc: func(oldObj, newObj interface{}) {
comp := newObj.(*compv1.Component)
klog.V(4).InfoS("component updated", "name", comp.Name, "namespace", comp.Namespace)
ow.updateComponent(*comp)
},
DeleteFunc: func(obj interface{}) {
comp := obj.(*compv1.Component)
klog.V(4).InfoS("component deleted", "name", comp.Name, "namespace", comp.Namespace)
},
})
}

func (ow *objectWatcher) updateComponent(comp compv1.Component) {
ow.mu.Lock()
ow.componentsMap[string(comp.UID)] = comp
ow.mu.Unlock()
ow.componentChange(comp)
}

func (ow *objectWatcher) componentChange(comp compv1.Component) {
Expand All @@ -89,22 +100,31 @@ func (ow *objectWatcher) componentChange(comp compv1.Component) {
}
}

func (ow *objectWatcher) startWatchingForConfigurations(ctx context.Context, client *versioned.Clientset) {
watcher, err := client.ConfigurationV1alpha1().Configurations("").Watch(ctx, v1.ListOptions{})
if err != nil {
klog.ErrorS(err, "error creating configurations watcher")
return
}
for event := range watcher.ResultChan() {
klog.V(4).InfoS("received configuration", "event", event.Type, "object", event.Object)
if event.Type != watch.Error {
comp := event.Object.(*configv1.Configuration)
ow.mu.Lock()
ow.configurationsMap[string(comp.UID)] = *comp
ow.mu.Unlock()
ow.configurationChange(*comp)
}
}
func (ow *objectWatcher) startWatchingForConfigurations(factory externalversions.SharedInformerFactory) {
informer := factory.Configuration().V1alpha1().Configurations().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
comp := obj.(*configv1.Configuration)
klog.V(4).InfoS("configuration added", "name", comp.Name, "namespace", comp.Namespace)
ow.updateConfiguration(*comp)
},
UpdateFunc: func(oldObj, newObj interface{}) {
comp := newObj.(*configv1.Configuration)
klog.V(4).InfoS("configuration updated", "name", comp.Name, "namespace", comp.Namespace)
ow.updateConfiguration(*comp)
},
DeleteFunc: func(obj interface{}) {
comp := obj.(*configv1.Configuration)
klog.V(4).InfoS("configuration deleted", "name", comp.Name, "namespace", comp.Namespace)
},
})
}

func (ow *objectWatcher) updateConfiguration(comp configv1.Configuration) {
ow.mu.Lock()
ow.configurationsMap[string(comp.UID)] = comp
ow.mu.Unlock()
ow.configurationChange(comp)
}

func (ow *objectWatcher) configurationChange(comp configv1.Configuration) {
Expand Down

0 comments on commit e1a9316

Please sign in to comment.