Skip to content

Commit

Permalink
Queue context timeout (#107)
Browse files Browse the repository at this point in the history
* Queue context timeout

* removing debug log

* tidy

* linting

* using time.Until
  • Loading branch information
tbuchaillot authored Jan 31, 2024
1 parent 0ae3bf9 commit d2a325e
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 7 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect; indirectrequire
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ go.mongodb.org/mongo-driver v1.13.1 h1:YIc7HTYsKndGK4RFzJ3covLz1byri52x0IoMB0Pt/
go.mongodb.org/mongo-driver v1.13.1/go.mod h1:wcDf1JBCXy2mOW0bWHwO/IOYqdca1MPCwDtFu/Z9+eo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand All @@ -65,8 +66,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
Expand Down
8 changes: 7 additions & 1 deletion temporal/internal/driver/redisv9/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redisv9
import (
"context"
"errors"
"time"

"github.com/TykTechnologies/storage/temporal/model"
"github.com/TykTechnologies/storage/temporal/temperr"
Expand Down Expand Up @@ -70,7 +71,12 @@ func (m *messageAdapter) Payload() (string, error) {

// Receive waits for and returns the next message from the subscription.
func (r *subscriptionAdapter) Receive(ctx context.Context) (model.Message, error) {
msg, err := r.pubSub.Receive(ctx)
timeout := time.Duration(0)
if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
}

msg, err := r.pubSub.ReceiveTimeout(ctx, timeout)
if err != nil {
if errors.Is(err, redis.ErrClosed) {
return nil, temperr.ClosedConnection
Expand Down
File renamed without changes.
49 changes: 49 additions & 0 deletions temporal/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,52 @@ func TestQueue_NewQueue(t *testing.T) {
_, err := NewQueue(&testutil.StubConnector{})
assert.NotNil(t, err)
}

func TestQueue_Ctx(t *testing.T) {
connectors := testutil.TestConnectors(t)
defer testutil.CloseConnectors(t, connectors)

for _, connector := range connectors {
t.Run(connector.Type(), func(t *testing.T) {
queue, err := NewQueue(connector)
assert.Nil(t, err)
assert.NotNil(t, queue)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)

didReceive := make(chan bool, 1)
go func(context.Context) {
defer func() {
close(didReceive)
}()

sub := queue.Subscribe(ctx, "test_channel")
defer sub.Close()
for {
select {
case <-ctx.Done():
return
default:
msg, err := sub.Receive(ctx)
if msg.Type() == model.MessageTypeSubscription {
continue
} else {
assert.NotNil(t, err)
didReceive <- true
return
}
}
}
}(ctx)

_, err = queue.Publish(context.Background(), "test_channel", "test")
assert.Nil(t, err)

// cancel the context now that the goroutine is running
cancel()

// wait for the goroutine to exit
<-didReceive
})
}
}

0 comments on commit d2a325e

Please sign in to comment.