This repository has been archived by the owner on Apr 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathsocket.go
195 lines (167 loc) · 5.12 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package main
import (
"log"
"net"
"net/http"
"net/url"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = 60 * time.Second
)
// S exposes some methods for interacting with a websocket
type Socket interface {
// Submits a payload to the web socket as a text message.
Write([]byte)
// return the provided name
GetName() string
// Close the socket.
Close()
}
// implementation of S
type s struct {
// a string associated with the socket
name string
// the websocket connection
ws *websocket.Conn
// buffered channel of outbound messages
send chan []byte
shutdown chan bool
closed bool
closeLock *sync.Mutex
// event functions
onRead func(messageType int, message []byte)
onClose func(name string)
}
// NewSocket upgrades an existing TCP connection to a websocket connection in response to a client request for a websocket.
// `name` here is just an identifying string for the socket, which will be returned when/if the socket is closed
// by calling a provided function (settable with `SetOnClose()`).
// `or` here is the func that's called when a message is read from the socket. The call is made from a separate routine.
// The message types are defined in RFC 6455, section 11.8.
// `oc` here is the func that's called when the socket is just about to be closed. The call is made from a
// separate routine.
// If you do not care about these callbacks, pass nil instead.
func NewSocket(name string, w http.ResponseWriter, r *http.Request, or func(int, []byte), oc func(string)) (Socket, error) {
ws, err := websocket.Upgrade(w, r, nil, 1024, 1024)
if _, ok := err.(websocket.HandshakeError); ok {
http.Error(w, "Not a websocket handshake", http.StatusBadRequest)
return nil, err
} else if err != nil {
http.Error(w, "Error while opening websocket!", http.StatusInternalServerError)
return nil, err
}
return socketSetup(name, ws, or, oc), nil
}
// NewClient creates a client web socket connection to the host running at the provided URL.
// `name` here is just an identifying string for the socket, which will be returned when/if the socket is closed
// by calling a provided function (settable with `SetOnClose()`).
// `or` here is the func that's called when a message is read from the socket. The call is made from a separate routine.
// The message types are defined in RFC 6455, section 11.8.
// `oc` here is the func that's called when the socket is just about to be closed. The call is made from a
// separate routine.
// If you do not care about these callbacks, pass nil instead.
func NewClient(name string, socketURL string, or func(int, []byte), oc func(string)) (Socket, error) {
u, err := url.Parse(socketURL)
if err != nil {
log.Println("Could not parse URL from provided URL string:", socketURL, err)
return nil, err
}
conn, err := net.Dial("tcp", u.Host)
if err != nil {
log.Println("Could not connect to provided host:", u.Host, err)
return nil, err
}
ws, _, err := websocket.NewClient(conn, u, nil, 1024, 1024)
if err != nil {
log.Println("Error while opening websocket:", err)
return nil, err
}
return socketSetup(name, ws, or, oc), nil
}
func socketSetup(name string, ws *websocket.Conn, or func(int, []byte), oc func(string)) Socket {
if or == nil {
or = func(int, []byte) {}
}
if oc == nil {
oc = func(string) {}
}
s := &s{
name: name,
ws: ws,
send: make(chan []byte, 256),
shutdown: make(chan bool),
closed: false,
closeLock: &sync.Mutex{},
onRead: or,
onClose: oc,
}
go s.writePump()
go s.readPump()
return s
}
func (s *s) Write(payload []byte) {
s.send <- payload
}
func (s *s) Close() {
s.closeLock.Lock()
if s.closed {
return
}
s.closed = true
s.closeLock.Unlock()
s.onClose(s.name)
s.ws.Close()
}
func (s *s) GetName() string {
return s.name
}
// readPump pumps messages from the websocket
func (s *s) readPump() {
for {
mt, message, err := s.ws.ReadMessage()
if err != nil {
// This happens anytime a client closes the connection, which can end up with
// chatty logs, so we aren't logging this error currently. If we did, it would look like:
// log.Println("[" + s.name + "]", "Error during socket read:", err)
s.Close()
s.shutdown <- true
return
}
s.onRead(mt, message)
}
}
// writePump pumps messages to the websocket
func (s *s) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
}()
for {
select {
case <-s.shutdown:
return
case message := <-s.send:
if err := s.write(websocket.TextMessage, message); err != nil {
log.Println("["+s.name+"]", "Error during socket write:", err)
s.Close()
return
}
case <-ticker.C:
if err := s.write(websocket.PingMessage, []byte{}); err != nil {
log.Println("["+s.name+"]", "Error during ping for socket:", err)
s.Close()
return
}
}
}
}
// writes a message with the given message type and payload.
func (s *s) write(mt int, payload []byte) error {
s.ws.SetWriteDeadline(time.Now().Add(writeWait))
return s.ws.WriteMessage(mt, payload)
}