Skip to content

Commit

Permalink
refactor operator client
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Oct 15, 2021
1 parent e0aeff3 commit fc14566
Showing 1 changed file with 58 additions and 48 deletions.
106 changes: 58 additions & 48 deletions pkg/operator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import (
"context"
jsoniter "github.com/json-iterator/go"
"google.golang.org/grpc"
"io"
"google.golang.org/grpc/connectivity"
"k8s.io/klog/v2"
"rusi/pkg/custom-resource/components"
"rusi/pkg/custom-resource/configuration"
"rusi/pkg/kube"
operatorv1 "rusi/pkg/proto/operator/v1"
)

func newClient(ctx context.Context, address string) (operatorv1.RusiOperatorClient, *grpc.ClientConn, error) {
var conn *grpc.ClientConn

func newClient(ctx context.Context, address string) (cl operatorv1.RusiOperatorClient, err error) {
var retryPolicy = `{
"methodConfig": [{
"name": [{"service": "rusi.proto.operator.v1.RusiOperator"}],
Expand All @@ -26,48 +28,51 @@ func newClient(ctx context.Context, address string) (operatorv1.RusiOperatorClie
}
}]}`

conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))
if err != nil {
return nil, nil, err
if conn == nil {
conn, err = grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))
if err != nil {
return nil, err
}
}
return operatorv1.NewRusiOperatorClient(conn), conn, nil
return operatorv1.NewRusiOperatorClient(conn), nil
}

func GetComponentsWatcher(address string) func(context.Context) (<-chan components.Spec, error) {
return func(ctx context.Context) (<-chan components.Spec, error) {
c := make(chan components.Spec)
client, conn, err := newClient(ctx, address)
client, err := newClient(ctx, address)
if err != nil {
return nil, err
}
namespace := kube.GetCurrentNamespace()
stream, err := client.WatchComponents(ctx, &operatorv1.WatchComponentsRequest{Namespace: namespace})
req := &operatorv1.WatchComponentsRequest{Namespace: namespace}
stream, err := client.WatchComponents(ctx, req)
if err != nil {
return nil, err
}
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
//done <- true
klog.Warning("watch components grpc stream EOF")
break
}
if err != nil {
//reconnect <- true
klog.ErrorS(err, "watch components grpc stream lost")
break
select {
case <-ctx.Done():
return
default:
for {
msg, err := stream.Recv()
if err != nil {
klog.ErrorS(err, "watch components grpc stream error")
break
}
spec := components.Spec{}
err = jsoniter.Unmarshal(msg.Data, &spec)
if err != nil {
klog.ErrorS(err, "unable to Unmarshal operator data ")
}
c <- spec
}
klog.Warning("watch components grpc stream closed, reconnecting...")
stream, _ = client.WatchComponents(ctx, req)
}
spec := components.Spec{}
err = jsoniter.Unmarshal(msg.Data, &spec)
if err != nil {
klog.ErrorS(err, "unable to Unmarshal operator data ")
}
c <- spec
}
klog.Warning("watch components grpc stream closed")
close(c)
conn.Close()
}()
return c, nil
}
Expand All @@ -76,40 +81,45 @@ func GetComponentsWatcher(address string) func(context.Context) (<-chan componen
func GetConfigurationWatcher(address string) func(context.Context, string) (<-chan configuration.Spec, error) {
return func(ctx context.Context, name string) (<-chan configuration.Spec, error) {
c := make(chan configuration.Spec)
client, conn, err := newClient(ctx, address)
client, err := newClient(ctx, address)
if err != nil {
return nil, err
}
namespace := kube.GetCurrentNamespace()
stream, err := client.WatchConfiguration(ctx, &operatorv1.WatchConfigurationRequest{
ConfigName: name, Namespace: namespace})
req := &operatorv1.WatchConfigurationRequest{ConfigName: name, Namespace: namespace}
stream, err := client.WatchConfiguration(ctx, req)

if err != nil {
return nil, err
}
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
//done <- true
klog.Warning("watch configuration grpc stream EOF")
break
}
if err != nil {
klog.ErrorS(err, "watch configuration grpc stream lost")
break
}
spec := configuration.Spec{}
err = jsoniter.Unmarshal(msg.Data, &spec)
if err != nil {
klog.ErrorS(err, "unable to Unmarshal operator data ")
select {
case <-ctx.Done():
return
default:
for {
msg, err := stream.Recv()
if err != nil {
klog.ErrorS(err, "watch configuration grpc stream error")
break
}
spec := configuration.Spec{}
err = jsoniter.Unmarshal(msg.Data, &spec)
if err != nil {
klog.ErrorS(err, "unable to Unmarshal operator data ")
}
c <- spec
}
klog.Warning("watch configuration grpc stream closed, reconnecting ...")
stream, _ = client.WatchConfiguration(ctx, req)
}
c <- spec
}
klog.Warning("watch configuration grpc stream closed")
close(c)
conn.Close()
}()
return c, nil
}
}

func IsOperatorClientAlive() bool {
return conn != nil && conn.GetState() == connectivity.Ready
}

0 comments on commit fc14566

Please sign in to comment.