forked from donovanhide/eventsource
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathstream_requests_test.go
154 lines (118 loc) · 4.26 KB
/
stream_requests_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package eventsource
import (
"bytes"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/launchdarkly/go-test-helpers/v2/httphelpers"
)
func TestStreamCanUseCustomClient(t *testing.T) {
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
handler, requestsCh := httphelpers.RecordingHandler(streamHandler)
httpServer := httptest.NewServer(handler)
defer httpServer.Close()
client := *http.DefaultClient
client.Transport = urlSuffixingRoundTripper{http.DefaultTransport, "path"}
stream := mustSubscribe(t, httpServer.URL, StreamOptionHTTPClient(&client))
defer stream.Close()
r := <-requestsCh
assert.Equal(t, "/path", r.Request.URL.Path)
}
func TestStreamSendsLastEventID(t *testing.T) {
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
handler, requestsCh := httphelpers.RecordingHandler(streamHandler)
httpServer := httptest.NewServer(handler)
defer httpServer.Close()
lastID := "xyz"
stream := mustSubscribe(t, httpServer.URL, StreamOptionLastEventID(lastID))
defer stream.Close()
r0 := <-requestsCh
assert.Equal(t, lastID, r0.Request.Header.Get("Last-Event-ID"))
}
func TestCanReplaceStreamQueryParameters(t *testing.T) {
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
handler, requestsCh := httphelpers.RecordingHandler(streamHandler)
httpServer := httptest.NewServer(handler)
defer httpServer.Close()
option := StreamOptionDynamicQueryParams(func(existing url.Values) url.Values {
return url.Values{
"filter": []string{"my-custom-filter"},
"basis": []string{"last-known-basis"},
}
})
stream := mustSubscribe(t, httpServer.URL, option)
defer stream.Close()
r0 := <-requestsCh
assert.Equal(t, "my-custom-filter", r0.Request.URL.Query().Get("filter"))
assert.Equal(t, "last-known-basis", r0.Request.URL.Query().Get("basis"))
}
func TestCanUpdateStreamQueryParameters(t *testing.T) {
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
handler, requestsCh := httphelpers.RecordingHandler(streamHandler)
httpServer := httptest.NewServer(handler)
defer httpServer.Close()
option := StreamOptionDynamicQueryParams(func(existing url.Values) url.Values {
if existing.Has("count") {
count, _ := strconv.Atoi(existing.Get("count"))
if count == 1 {
existing.Set("count", strconv.Itoa(count+1))
return existing
}
return url.Values{}
}
return url.Values{
"initial": []string{"payload is set"},
"count": []string{"1"},
}
})
stream := mustSubscribe(t, httpServer.URL, option, StreamOptionInitialRetry(time.Millisecond))
defer stream.Close()
r0 := <-requestsCh
assert.Equal(t, "payload is set", r0.Request.URL.Query().Get("initial"))
assert.Equal(t, "1", r0.Request.URL.Query().Get("count"))
streamControl.EndAll()
<-stream.Errors // Accept the error to unblock the retry handler
r1 := <-requestsCh
assert.Equal(t, "payload is set", r1.Request.URL.Query().Get("initial"))
assert.Equal(t, "2", r1.Request.URL.Query().Get("count"))
streamControl.EndAll()
<-stream.Errors // Accept the error to unblock the retry handler
r2 := <-requestsCh
assert.False(t, r2.Request.URL.Query().Has("initial"))
assert.False(t, r2.Request.URL.Query().Has("count"))
}
func TestStreamReconnectWithRequestBodySendsBodyTwice(t *testing.T) {
body := []byte("my-body")
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
handler, requestsCh := httphelpers.RecordingHandler(streamHandler)
httpServer := httptest.NewServer(handler)
defer httpServer.Close()
req, _ := http.NewRequest("REPORT", httpServer.URL, bytes.NewBuffer(body))
if req.GetBody == nil {
t.Fatalf("Expected get body to be set")
}
stream, err := SubscribeWithRequestAndOptions(req, StreamOptionInitialRetry(time.Millisecond))
if err != nil {
t.Fatalf("Failed to subscribe: %s", err)
return
}
defer stream.Close()
// Wait for the first request
r0 := <-requestsCh
// Allow the stream to reconnect once; get the second request
streamControl.EndAll()
<-stream.Errors // Accept the error to unblock the retry handler
r1 := <-requestsCh
stream.Close()
assert.Equal(t, body, r0.Body)
assert.Equal(t, body, r1.Body)
}