diff --git a/go.mod b/go.mod index 753f6ee1..6f0e0580 100644 --- a/go.mod +++ b/go.mod @@ -22,10 +22,10 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect - github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect - golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect; indirectrequire + golang.org/x/crypto v0.17.0 // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/text v0.14.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 17141c75..628f340b 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,9 @@ go.mongodb.org/mongo-driver v1.13.1 h1:YIc7HTYsKndGK4RFzJ3covLz1byri52x0IoMB0Pt/ go.mongodb.org/mongo-driver v1.13.1/go.mod h1:wcDf1JBCXy2mOW0bWHwO/IOYqdca1MPCwDtFu/Z9+eo= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -65,8 +66,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/temporal/internal/driver/redisv9/queue.go b/temporal/internal/driver/redisv9/queue.go index 4652019e..10998299 100644 --- a/temporal/internal/driver/redisv9/queue.go +++ b/temporal/internal/driver/redisv9/queue.go @@ -3,6 +3,7 @@ package redisv9 import ( "context" "errors" + "time" "github.com/TykTechnologies/storage/temporal/model" "github.com/TykTechnologies/storage/temporal/temperr" @@ -70,7 +71,12 @@ func (m *messageAdapter) Payload() (string, error) { // Receive waits for and returns the next message from the subscription. func (r *subscriptionAdapter) Receive(ctx context.Context) (model.Message, error) { - msg, err := r.pubSub.Receive(ctx) + timeout := time.Duration(0) + if deadline, ok := ctx.Deadline(); ok { + timeout = time.Until(deadline) + } + + msg, err := r.pubSub.ReceiveTimeout(ctx, timeout) if err != nil { if errors.Is(err, redis.ErrClosed) { return nil, temperr.ClosedConnection diff --git a/temporal/internal/driver/redisv9/redisv8.go b/temporal/internal/driver/redisv9/redisv9.go similarity index 100% rename from temporal/internal/driver/redisv9/redisv8.go rename to temporal/internal/driver/redisv9/redisv9.go diff --git a/temporal/queue/queue_test.go b/temporal/queue/queue_test.go index 30d74219..9840fa94 100644 --- a/temporal/queue/queue_test.go +++ b/temporal/queue/queue_test.go @@ -278,3 +278,52 @@ func TestQueue_NewQueue(t *testing.T) { _, err := NewQueue(&testutil.StubConnector{}) assert.NotNil(t, err) } + +func TestQueue_Ctx(t *testing.T) { + connectors := testutil.TestConnectors(t) + defer testutil.CloseConnectors(t, connectors) + + for _, connector := range connectors { + t.Run(connector.Type(), func(t *testing.T) { + queue, err := NewQueue(connector) + assert.Nil(t, err) + assert.NotNil(t, queue) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + + didReceive := make(chan bool, 1) + go func(context.Context) { + defer func() { + close(didReceive) + }() + + sub := queue.Subscribe(ctx, "test_channel") + defer sub.Close() + for { + select { + case <-ctx.Done(): + return + default: + msg, err := sub.Receive(ctx) + if msg.Type() == model.MessageTypeSubscription { + continue + } else { + assert.NotNil(t, err) + didReceive <- true + return + } + } + } + }(ctx) + + _, err = queue.Publish(context.Background(), "test_channel", "test") + assert.Nil(t, err) + + // cancel the context now that the goroutine is running + cancel() + + // wait for the goroutine to exit + <-didReceive + }) + } +}