Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Oct 18, 2021
1 parent ce4f4b5 commit ce0628e
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 38 deletions.
5 changes: 4 additions & 1 deletion cmd/rusid/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"k8s.io/klog/v2"
"net/http"
"os"
"os/signal"
"rusi/internal/tracing"
Expand Down Expand Up @@ -92,6 +93,8 @@ func shutdownOnInterrupt(cancel func()) {

func startHealthzServer(ctx context.Context, healthzPort int, options ...healthcheck.Option) {
if err := healthcheck.Run(ctx, healthzPort, options...); err != nil {
klog.Fatalf("failed to start healthz server: %s", err)
if err != http.ErrServerClosed {
klog.ErrorS(err, "failed to start healthz server")
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC3
go.opentelemetry.io/otel/sdk v1.0.0-RC3
go.opentelemetry.io/otel/trace v1.0.0-RC3
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/runtime/grpc/grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (srv *rusiServerImpl) Subscribe(subscribeServer v1.Rusi_SubscribeServer) er
defer cancel()
exit := false
refreshChan := srv.createRefreshChan()
defer srv.removeRefreshChan(refreshChan)
handler := srv.buildSubscribeHandler(subscribeServer)
for {
hCtx, hCancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -144,7 +145,6 @@ func (srv *rusiServerImpl) Subscribe(subscribeServer v1.Rusi_SubscribeServer) er
hCancel()
_ = unsub()
if exit {
srv.removeRefreshChan(refreshChan)
return ctx.Err()
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func (srv *rusiServerImpl) Publish(ctx context.Context, request *v1.PublishReque
})

if err != nil {
klog.V(4).Info(err)
klog.ErrorS(err, "error on publishing")
err = status.Errorf(codes.Unknown, err.Error())
}
return &emptypb.Empty{}, err
Expand Down
26 changes: 14 additions & 12 deletions pkg/api/runtime/grpc/grpc_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/google/uuid"
"github.com/pkg/errors"
"k8s.io/klog/v2"
"net"
"reflect"
"rusi/pkg/messaging"
Expand Down Expand Up @@ -48,6 +49,8 @@ func Test_grpc_to_messaging_subscriptionOptions(t *testing.T) {
}

func Test_RusiServer_Pubsub(t *testing.T) {
l := klog.Level(4)
l.Set("4")
store := messaging.NewInMemoryBus()
publishHandler := func(ctx context.Context, request messaging.PublishRequest) error {
return store.Publish(request.Topic, &messaging.MessageEnvelope{
Expand Down Expand Up @@ -117,15 +120,15 @@ func Test_RusiServer_Pubsub(t *testing.T) {
})
assert.NoError(t, err, "subscribers count does not match")

go client.Publish(ctx, tt.publishRequest)
client.Publish(ctx, tt.publishRequest)
msg1, err := stream.Recv() //blocks
assert.NoError(t, err)
var data string
_ = serdes.Unmarshal(msg1.GetData(), &data)
assert.Equal(t, tt.wantData, data)
assert.Equal(t, tt.wantMetadata, msg1.GetMetadata())

go client.Publish(ctx, tt.publishRequest)
client.Publish(ctx, tt.publishRequest)
msg2, err := stream.Recv() //blocks

stream.Send(createAckRequest(msg1.Id, ""))
Expand Down Expand Up @@ -160,7 +163,7 @@ func Test_RusiServer_Pubsub(t *testing.T) {
return store.GetSubscribersCount(topic) == 1
})
assert.NoError(t, err, "subscribers count does not match")
go client.Publish(ctx, pubRequest)
client.Publish(ctx, pubRequest)
var msg1, msg2 *v1.ReceivedMessage
err = wait(func() error {
msg1, err = stream.Recv() //blocks
Expand All @@ -177,7 +180,7 @@ func Test_RusiServer_Pubsub(t *testing.T) {
return store.GetSubscribersCount(topic) == 1
})
assert.NoError(t, err)
go client.Publish(ctx, pubRequest)
client.Publish(ctx, pubRequest)
msg2, err = stream.Recv() //blocks
assert.NotNil(t, msg2)
assert.NoError(t, err)
Expand Down Expand Up @@ -217,8 +220,7 @@ func Test_RusiServer_Pubsub(t *testing.T) {
return store.GetSubscribersCount(topic) == 1
})
assert.NoError(t, err, "subscribers count does not match")
go client.Publish(ctx, pubRequest)
time.Sleep(100 * time.Millisecond)
client.Publish(ctx, pubRequest)
err = wait(func() error {
return server.Refresh()
}, "timeout waiting for server refresh")
Expand All @@ -234,7 +236,7 @@ func Test_RusiServer_Pubsub(t *testing.T) {
return err
}, "timeout waiting for receiving message on stream")
assert.NoError(t, err)
go client.Publish(ctx, pubRequest)
client.Publish(ctx, pubRequest)
assert.Nil(t, err)
msg, err := stream.Recv() //blocks
assert.NotNil(t, msg)
Expand Down Expand Up @@ -284,7 +286,7 @@ func Test_RusiServer_Pubsub(t *testing.T) {
return store.GetSubscribersCount(topic) == 1
})
assert.NoError(t, err, "subscribers count does not match")
go client.Publish(ctx, pubRequest)
client.Publish(ctx, pubRequest)

err = wait(func() error {
return server.Refresh()
Expand Down Expand Up @@ -358,8 +360,8 @@ func Test_RusiServer_Pubsub(t *testing.T) {
})
assert.NoError(t, err, "subscribers count does not match")

go client.Publish(ctx, pubRequest)
go client.Publish(ctx, pubRequest)
client.Publish(ctx, pubRequest)
client.Publish(ctx, pubRequest)
var msg1, msg2 *v1.ReceivedMessage
err = wait(func() error {
msg1, _ = stream.Recv() //blocks
Expand Down Expand Up @@ -402,7 +404,7 @@ func Test_RusiServer_Pubsub(t *testing.T) {
})
assert.NoError(t, err, "subscribers count does not match")

go client.Publish(ctx, pubRequest)
client.Publish(ctx, pubRequest)
var msg *v1.ReceivedMessage
err = wait(func() error {
msg, _ = stream.Recv() //blocks
Expand Down Expand Up @@ -444,7 +446,7 @@ func Test_RusiServer_Pubsub(t *testing.T) {
})
assert.NoError(t, err, "subscribers count does not match")

go client.Publish(ctx, pubRequest)
client.Publish(ctx, pubRequest)
err = wait(func() error {
s1.Recv() //blocks
s2.Recv() //blocks
Expand Down
46 changes: 23 additions & 23 deletions pkg/messaging/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package messaging

import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
"sync"
"sync/atomic"
Expand All @@ -20,21 +20,16 @@ func NewInMemoryBus() *inMemoryBus {
}

func (c *inMemoryBus) Publish(topic string, env *MessageEnvelope) error {
atomic.AddInt32(c.workingCounter, 1)
defer atomic.AddInt32(c.workingCounter, -1)
c.mu.RLock()
h := c.handlers[topic]
c.mu.RUnlock()
if env.Headers == nil {
env.Headers = map[string]string{}
}
env.Headers["topic"] = topic
println("Publish to topic " + topic)

println("start runHandlers for topic " + topic)
c.runHandlers(h, env)
println("finish runHandlers for topic " + topic)
klog.InfoS("Publish to topic " + topic)

go c.runHandlers(h, env)
return nil
}

Expand All @@ -44,7 +39,7 @@ func (c *inMemoryBus) Subscribe(topic string, handler Handler, options *Subscrip

handlerP := &handler
c.handlers[topic] = append(c.handlers[topic], handlerP)
println("Subscribed to topic " + topic)
klog.InfoS("Subscribed to topic " + topic)

return func() error {
c.mu.Lock()
Expand All @@ -57,7 +52,7 @@ func (c *inMemoryBus) Subscribe(topic string, handler Handler, options *Subscrip
}
c.handlers[topic] = s

println("unSubscribe from topic " + topic)
klog.InfoS("unSubscribe from topic " + topic)
return nil
}, nil
}
Expand All @@ -79,24 +74,29 @@ func (c *inMemoryBus) GetSubscribersCount(topic string) int {
return len(c.handlers[topic])
}

func (c *inMemoryBus) runHandlers(handlers []*Handler, env *MessageEnvelope) {
sg := sync.WaitGroup{}
sg.Add(len(handlers))
func (c *inMemoryBus) runHandlers(handlers []*Handler, env *MessageEnvelope) error {
atomic.AddInt32(c.workingCounter, 1)
defer atomic.AddInt32(c.workingCounter, -1)

klog.InfoS("start runHandlers for topic " + env.Subject)
eg := errgroup.Group{}
c.mu.RLock()
for i, h := range handlers {
h := h
i := i
println(fmt.Sprintf("starting Handler %d with metadata %v", i, env.Headers))
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
klog.Infof("starting Handler %d with metadata %v", i, env.Headers)
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
defer sg.Done()
err := (*h)(ctx, env)
if err != nil {
klog.ErrorS(err, "error")
}
}()
return (*h)(ctx, env) //run handler blocks
})
}
c.mu.RUnlock()
sg.Wait()
err := eg.Wait()
if err != nil {
klog.ErrorS(err, "error running handlers")
} else {
klog.InfoS("finish runHandlers for topic " + env.Subject)
}
return err
}

0 comments on commit ce0628e

Please sign in to comment.