Skip to content

Commit

Permalink
feat: basic building block for wss
Browse files Browse the repository at this point in the history
wip

fix: rename pkg

wip

feat: update building block
  • Loading branch information
nick-bisonai committed May 27, 2024
1 parent fa8ec13 commit 5c4061f
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 0 deletions.
1 change: 1 addition & 0 deletions node/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/rs/zerolog v1.32.0
github.com/stretchr/testify v1.8.4
golang.org/x/crypto v0.19.0
nhooyr.io/websocket v1.8.11
)

require (
Expand Down
2 changes: 2 additions & 0 deletions node/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,8 @@ inet.af/netaddr v0.0.0-20220617031823-097006376321 h1:B4dC8ySKTQXasnjDTMsoCMf1sQ
inet.af/netaddr v0.0.0-20220617031823-097006376321/go.mod h1:OIezDfdzOgFhuw4HuWapWq2e9l0H9tK4F1j+ETRtF3k=
lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI=
lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU=
rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA=
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
Expand Down
102 changes: 102 additions & 0 deletions node/pkg/wss/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package wss

import (
"context"
"fmt"
"net/http"
"net/url"

"github.com/rs/zerolog/log"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)

type wsConn struct {
*websocket.Conn
}

type ConnectionConfig struct {
Endpoint string
ProxyUrl string
}

type ConnectionOption func(*ConnectionConfig)

func WithEndpoint(endpoint string) ConnectionOption {
return func(c *ConnectionConfig) {
c.Endpoint = endpoint
}
}

func WithProxyUrl(proxyUrl string) ConnectionOption {
return func(c *ConnectionConfig) {
c.ProxyUrl = proxyUrl
}
}

func NewConnection(ctx context.Context, opts ...ConnectionOption) (*wsConn, error) {
config := &ConnectionConfig{}
for _, opt := range opts {
opt(config)
}

if config.Endpoint == "" {
log.Error().Msg("endpoint is required")
return nil, fmt.Errorf("endpoint is required")
}

dialOption := &websocket.DialOptions{}

if config.ProxyUrl != "" {
proxyURL, err := url.Parse(config.ProxyUrl)
if err != nil {
return nil, err
}

proxyTransport := http.DefaultTransport.(*http.Transport).Clone()
proxyTransport.Proxy = http.ProxyURL(proxyURL)

dialOption = &websocket.DialOptions{
HTTPClient: &http.Client{
Transport: proxyTransport,
},
}
}

conn, _, err := websocket.Dial(ctx, config.Endpoint, dialOption)
if err != nil {
log.Error().Err(err).Msg("error opening websocket connection")
return nil, err
}
return &wsConn{conn}, nil
}

func (ws *wsConn) Write(ctx context.Context, message interface{}) error {
err := wsjson.Write(ctx, ws.Conn, message)
if err != nil {
log.Error().Err(err).Msg("error writing to websocket")
return err
}
return nil
}

func (ws *wsConn) Read(ctx context.Context, ch chan interface{}) {
for {
var t interface{}
err := wsjson.Read(ctx, ws.Conn, &t)
if err != nil {
log.Error().Err(err).Msg("error reading from websocket")
break
}
ch <- t
}
}

func (ws *wsConn) Close(ctx context.Context) error {
err := ws.Conn.Close(websocket.StatusNormalClosure, "")
if err != nil {
log.Error().Err(err).Msg("error closing websocket")
return err
}
return nil
}
82 changes: 82 additions & 0 deletions node/pkg/wss/wss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package wss

import (
"context"
"fmt"
"sync"

"nhooyr.io/websocket"
)

type Connections struct {
m map[string]*websocket.Conn
lock sync.RWMutex
}

var (
connections *Connections
)

func init() {
connections = &Connections{
m: make(map[string]*websocket.Conn),
}
}

func (c *Connections) Update(key string, conn *websocket.Conn) error {
c.lock.Lock()
defer c.lock.Unlock()

if c.m[key] != nil {
err := c.m[key].Close(websocket.StatusNormalClosure, "")
if err != nil {
return err
}
}
c.m[key] = conn
return nil
}

func (c *Connections) Remove(key string) error {
c.lock.Lock()
defer c.lock.Unlock()

if c.m[key] != nil {
err := c.m[key].Close(websocket.StatusNormalClosure, "")
if err != nil {
return err
}
}
delete(c.m, key)
return nil
}

func (c *Connections) Get(key string) (*websocket.Conn, error) {
c.lock.RLock()
defer c.lock.RUnlock()

conn, ok := c.m[key]
if !ok {
return nil, fmt.Errorf("connection not found")
}
return conn, nil
}

func getConnections() *Connections {
return connections
}

func UpdateConnection(ctx context.Context, key string, conn *websocket.Conn) error {
connections := getConnections()
return connections.Update(key, conn)
}

func GetConnection(key string) (*websocket.Conn, error) {
connections := getConnections()
return connections.Get(key)
}

func RemoveConnection(key string) error {
connections := getConnections()
return connections.Remove(key)
}

0 comments on commit 5c4061f

Please sign in to comment.