Skip to content

Commit

Permalink
additional logging (#35)
Browse files Browse the repository at this point in the history
Co-authored-by: Radu Popovici <[email protected]>
  • Loading branch information
oncicaradupopovici and Radu Popovici authored Aug 1, 2022
1 parent 4e44064 commit 20f302c
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 26 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,9 @@ golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7 h1:8IVLkfbr2cLhv0a/vKq4UFUcJym8RmDoDboxCFWEjYE=
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
Expand Down
48 changes: 25 additions & 23 deletions pkg/api/runtime/grpc/grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/klog/v2"
"net"
"rusi/pkg/api/runtime"
"rusi/pkg/messaging"
"rusi/pkg/messaging/serdes"
v1 "rusi/pkg/proto/runtime/v1"
"sync"

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/klog/v2"
)

func NewGrpcAPI(port int, serverOptions ...grpc.ServerOption) runtime.Api {
Expand Down Expand Up @@ -133,7 +134,7 @@ func (srv *rusiServerImpl) Subscribe(stream v1.Rusi_SubscribeServer) error {
exit := false
subId := uuid.NewString()
defer srv.removeRefreshChan(subId)
handler := srv.buildSubscribeHandler(stream)
handler := srv.buildSubscribeHandler(stream, request)
for {
hCtx, hCancel := context.WithCancel(context.Background())
unsub, err := srv.subscribeHandler(hCtx, messaging.SubscribeRequest{
Expand Down Expand Up @@ -177,23 +178,23 @@ type subAck struct {
errCh chan error
}

func (srv *rusiServerImpl) buildSubscribeHandler(stream v1.Rusi_SubscribeServer) func(context.Context) messaging.Handler {
func (srv *rusiServerImpl) buildSubscribeHandler(stream v1.Rusi_SubscribeServer, sr *v1.SubscriptionRequest) func(context.Context) messaging.Handler {

subAckMap := map[string]*subAck{}
mu := &sync.RWMutex{}

//monitor incoming ack stream for the current subscription
go startAckReceiverForStream(subAckMap, mu, stream)
go startAckReceiverForStream(subAckMap, mu, stream, sr)

return func(buildCtx context.Context) messaging.Handler {
return func(ctx context.Context, env *messaging.MessageEnvelope) error {
if env.Id == "" {
return errors.New("message id is missing")
return fmt.Errorf("message id is missing for topic %s", env.Subject)
}

errChan := make(chan error)
ackChan := make(chan error)
mu.Lock()
subAckMap[env.Id] = &subAck{nil, errChan}
subAckMap[env.Id] = &subAck{nil, ackChan}
mu.Unlock()
//cleanup
defer func() {
Expand All @@ -220,21 +221,21 @@ func (srv *rusiServerImpl) buildSubscribeHandler(stream v1.Rusi_SubscribeServer)
select {
//handler builder closed context
case <-buildCtx.Done():
klog.V(4).InfoS("Context done before ack", "message", buildCtx.Err())
klog.V(4).InfoS("Context done before ack", "message", buildCtx.Err(), "topic", env.Subject)
return buildCtx.Err()
//subscriber context is done
case <-ctx.Done():
klog.V(4).InfoS("Context done before ack", "message", ctx.Err())
klog.V(4).InfoS("Context done before ack", "message", ctx.Err(), "topic", env.Subject)
return ctx.Err()
case err = <-errChan:
case err = <-ackChan:
klog.V(4).InfoS("Ack sent to pubsub", "topic", env.Subject, "Id", env.Id, "error", err)
return err
}
}
}
}

func startAckReceiverForStream(subAckMap map[string]*subAck, mu *sync.RWMutex, stream v1.Rusi_SubscribeServer) {
func startAckReceiverForStream(subAckMap map[string]*subAck, mu *sync.RWMutex, stream v1.Rusi_SubscribeServer, sr *v1.SubscriptionRequest) {

//wait for ack from the client
for {
Expand All @@ -245,20 +246,21 @@ func startAckReceiverForStream(subAckMap map[string]*subAck, mu *sync.RWMutex, s
default:
r, err := stream.Recv() //blocks
if err != nil {
klog.V(4).ErrorS(err, "ack stream error")
klog.V(4).ErrorS(err, "ack stream error", "topic", sr.GetTopic())
break
}
if r.GetAckRequest() == nil {
klog.V(4).InfoS("invalid ack response")
ar := r.GetAckRequest()
if ar == nil {
klog.V(4).InfoS("invalid ack response", "topic", sr.GetTopic())
break
}
if r.GetAckRequest().GetError() != "" {
err = errors.New(r.GetAckRequest().GetError())
if ar.GetError() != "" {
err = errors.New(ar.GetError())
}

mu.RLock()
mid := r.GetAckRequest().GetMessageId()
klog.V(4).InfoS("Ack received for message", "Id", mid)
mid := ar.GetMessageId()
klog.V(4).InfoS("Ack received for message", "Id", mid, "topic", sr.GetTopic())
for id, ack := range subAckMap {
if id == mid {
if ack.ackHandler != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/messaging/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/nats-io/nats.go"
"math/rand"
"rusi/pkg/healthcheck"
"rusi/pkg/messaging"
"rusi/pkg/messaging/serdes"
"strconv"
"time"

"github.com/nats-io/nats.go"

"k8s.io/klog/v2"

stan "github.com/nats-io/stan.go"
Expand Down Expand Up @@ -272,6 +273,10 @@ func (n *natsStreamingPubSub) Subscribe(topic string, handler messaging.Handler,
subs, err = n.natStreamingConn.QueueSubscribe(topic, n.options.natsQueueGroupName, natsMsgHandler, stanOptions...)
}

if err != nil {
klog.ErrorS(err, "nats-streaming: subscribe error", "topic", topic)
}

if err != nil || subs == nil {
return nil, fmt.Errorf("nats-streaming: subscribe error %s", err)
}
Expand Down

0 comments on commit 20f302c

Please sign in to comment.