Skip to content

Commit

Permalink
change refreshChannels from slice to map
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Mar 25, 2022
1 parent fe0ab6d commit adfbfb0
Showing 1 changed file with 16 additions and 20 deletions.
36 changes: 16 additions & 20 deletions pkg/api/runtime/grpc/grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -70,7 +71,7 @@ type rusiServerImpl struct {
mu sync.RWMutex
mainCtx context.Context
subsWaitGroup *sync.WaitGroup
refreshChannels []chan bool
refreshChannels map[string]chan bool
publishHandler messaging.PublishRequestHandler
subscribeHandler messaging.SubscribeRequestHandler
}
Expand All @@ -79,7 +80,7 @@ func newRusiServer(ctx context.Context,
publishHandler messaging.PublishRequestHandler,
subscribeHandler messaging.SubscribeRequestHandler) *rusiServerImpl {
return &rusiServerImpl{
refreshChannels: []chan bool{},
refreshChannels: map[string]chan bool{},
mainCtx: ctx,
subsWaitGroup: &sync.WaitGroup{},
publishHandler: publishHandler,
Expand All @@ -96,26 +97,23 @@ func (srv *rusiServerImpl) Refresh() error {
return nil
}

func (srv *rusiServerImpl) createRefreshChan() chan bool {
func (srv *rusiServerImpl) getRefreshChan(subId string) chan bool {
srv.mu.Lock()
defer srv.mu.Unlock()

c := make(chan bool)
srv.refreshChannels = append(srv.refreshChannels, c)
return c
if c, ok := srv.refreshChannels[subId]; ok {
return c
} else {
c := make(chan bool)
srv.refreshChannels[subId] = c
return c
}
}

func (srv *rusiServerImpl) removeRefreshChan(refreshChan chan bool) {
func (srv *rusiServerImpl) removeRefreshChan(subId string) {
srv.mu.Lock()
defer srv.mu.Unlock()

var s []chan bool
for _, channel := range srv.refreshChannels {
if channel != refreshChan {
s = append(s, channel)
}
}
srv.refreshChannels = s
delete(srv.refreshChannels, subId)
}

// Subscribe creates a subscription
Expand All @@ -132,10 +130,9 @@ func (srv *rusiServerImpl) Subscribe(stream v1.Rusi_SubscribeServer) error {
if request == nil {
return errors.New("invalid subscription request")
}

exit := false
refreshChan := srv.createRefreshChan()
defer srv.removeRefreshChan(refreshChan)
subId := uuid.NewString()
defer srv.removeRefreshChan(subId)
handler := srv.buildSubscribeHandler(stream)
for {
hCtx, hCancel := context.WithCancel(context.Background())
Expand All @@ -145,7 +142,6 @@ func (srv *rusiServerImpl) Subscribe(stream v1.Rusi_SubscribeServer) error {
Handler: handler(hCtx),
Options: messagingSubscriptionOptions(request.GetOptions()),
})

if err != nil {
hCancel()
return err
Expand All @@ -159,7 +155,7 @@ func (srv *rusiServerImpl) Subscribe(stream v1.Rusi_SubscribeServer) error {
case <-stream.Context().Done():
exit = true
err = stream.Context().Err()
case <-refreshChan:
case <-srv.getRefreshChan(subId):
exit = false
klog.V(4).InfoS("Refresh requested for", "topic", request.Topic)
}
Expand Down

0 comments on commit adfbfb0

Please sign in to comment.