Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ability to provide dynamic query parameters #44

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"sync"
"time"
)
Expand All @@ -14,11 +15,12 @@ import (
// It will try and reconnect if the connection is lost, respecting both
// received retry delays and event id's.
type Stream struct {
c *http.Client
req *http.Request
lastEventID string
readTimeout time.Duration
retryDelay *retryDelayStrategy
c *http.Client
req *http.Request
queryParamsFunc *func() url.Values
lastEventID string
readTimeout time.Duration
retryDelay *retryDelayStrategy
// Events emits the events received by the stream
Events chan Event
// Errors emits any errors encountered while reading events from the stream.
Expand Down Expand Up @@ -187,6 +189,10 @@ func newStream(request *http.Request, configuredOptions streamOptions) *Stream {
closer: make(chan struct{}),
}

if configuredOptions.queryParamsFunc != nil {
stream.queryParamsFunc = configuredOptions.queryParamsFunc
}

if configuredOptions.errorHandler == nil {
// The Errors channel is only used if there is no error handler.
stream.Errors = make(chan error)
Expand Down Expand Up @@ -231,6 +237,9 @@ func (stream *Stream) connect() (io.ReadCloser, error) {
stream.req.Header.Set("Last-Event-ID", stream.lastEventID)
}
req := *stream.req
if stream.queryParamsFunc != nil {
req.URL.RawQuery = (*stream.queryParamsFunc)().Encode()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a merge? Or do we want replacement behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need replacement to be able to remove a parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated so that the previously defined query parameters are sent along on subsequent requests.

}

// All but the initial connection will need to regenerate the body
if stream.connections > 0 && req.GetBody != nil {
Expand Down
18 changes: 18 additions & 0 deletions stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eventsource

import (
"net/http"
"net/url"
"time"
)

Expand All @@ -16,6 +17,7 @@ type streamOptions struct {
retryResetInterval time.Duration
initialRetryTimeout time.Duration
errorHandler StreamErrorHandler
queryParamsFunc *func() url.Values
}

// StreamOption is a common interface for optional configuration parameters that can be
Expand All @@ -24,6 +26,22 @@ type StreamOption interface {
apply(s *streamOptions) error
}

type dynamicQueryParamsOption struct {
queryParamsFunc func() url.Values
}

func (o dynamicQueryParamsOption) apply(s *streamOptions) error {
s.queryParamsFunc = &o.queryParamsFunc
return nil
}

// StreamOptionDynamicQueryParams returns an option that sets a function to
// generate query parameters each time the stream needs to make a fresh
// connection.
func StreamOptionDynamicQueryParams(f func() url.Values) StreamOption {
return dynamicQueryParamsOption{queryParamsFunc: f}
}

type readTimeoutOption struct {
timeout time.Duration
}
Expand Down
24 changes: 24 additions & 0 deletions stream_requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -45,6 +46,29 @@ func TestStreamSendsLastEventID(t *testing.T) {
assert.Equal(t, lastID, r0.Request.Header.Get("Last-Event-ID"))
}

func TestCanSetStreamQueryParameters(t *testing.T) {
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
handler, requestsCh := httphelpers.RecordingHandler(streamHandler)

httpServer := httptest.NewServer(handler)
defer httpServer.Close()

option := StreamOptionDynamicQueryParams(func() url.Values {
return url.Values{
"filter": []string{"my-custom-filter"},
"basis": []string{"last-known-basis"},
}
})

stream := mustSubscribe(t, httpServer.URL, option)
defer stream.Close()

r0 := <-requestsCh
assert.Equal(t, "my-custom-filter", r0.Request.URL.Query().Get("filter"))
assert.Equal(t, "last-known-basis", r0.Request.URL.Query().Get("basis"))
}

func TestStreamReconnectWithRequestBodySendsBodyTwice(t *testing.T) {
body := []byte("my-body")

Expand Down
Loading