Skip to content

Commit

Permalink
fix: remove conn limits
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Jul 27, 2024
1 parent 2ce655f commit aa40742
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 67 deletions.
67 changes: 33 additions & 34 deletions node/pkg/dal/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ func NewController(configs map[string]types.Config, internalCollector *collector
Collector: internalCollector,
configs: configs,

clients: make(map[*websocket.Conn]map[string]bool),
register: make(chan *websocket.Conn),
unregister: make(chan *websocket.Conn),
broadcast: make(map[string]chan dalcommon.OutgoingSubmissionData),
connectionCount: make(map[string]int),
clients: make(map[*websocket.Conn]map[string]bool),
register: make(chan *websocket.Conn),
unregister: make(chan *websocket.Conn),
broadcast: make(map[string]chan dalcommon.OutgoingSubmissionData),

mu: sync.RWMutex{},
}
Expand All @@ -55,38 +54,38 @@ func NewController(configs map[string]types.Config, internalCollector *collector
func (c *Controller) Start(ctx context.Context) {
go c.Collector.Start(ctx)
log.Info().Str("Player", "controller").Msg("api collector started")
go func() {
for {
select {
case conn := <-c.register:
apiKey := conn.Headers("X-Api-Key")
c.mu.Lock()
if _, ok := c.clients[conn]; !ok {
c.clients[conn] = make(map[string]bool)
c.connectionCount[apiKey]++
}
c.mu.Unlock()
case conn := <-c.unregister:
apiKey := conn.Headers("X-Api-Key")
c.mu.Lock()
if _, ok := c.clients[conn]; ok {
c.connectionCount[apiKey]--
delete(c.clients, conn)
}
go c.handleConnection(ctx)
log.Info().Str("Player", "contrller").Msg("connection handler started")
c.startBroadCast()
}

func (c *Controller) handleConnection(ctx context.Context) {
for {
select {
case conn := <-c.register:
c.mu.Lock()
if _, ok := c.clients[conn]; !ok {
c.clients[conn] = make(map[string]bool)
}
c.mu.Unlock()
case conn := <-c.unregister:
c.mu.Lock()
delete(c.clients, conn)
conn.Close()
c.mu.Unlock()
case <-ctx.Done():
c.mu.Lock()
for conn := range c.clients {
delete(c.clients, conn)
conn.Close()
c.mu.Unlock()
case <-ctx.Done():
c.mu.Lock()
for conn := range c.clients {
delete(c.clients, conn)
conn.Close()
}
c.mu.Unlock()
return
}
c.mu.Unlock()
return
}
}()
}
}

func (c *Controller) startBroadCast() {
for configId, stream := range c.Collector.OutgoingStream {
symbol := c.configIdToSymbol(configId)
if symbol == "" {
Expand All @@ -96,7 +95,7 @@ func (c *Controller) Start(ctx context.Context) {
}

for symbol := range c.configs {
go c.broadcastDataForSymbol(symbol)
c.broadcastDataForSymbol(symbol)
}
}

Expand Down
11 changes: 5 additions & 6 deletions node/pkg/dal/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ type Subscription struct {
type Controller struct {
Collector *collector.Collector

configs map[string]types.Config
clients map[*websocket.Conn]map[string]bool
register chan *websocket.Conn
unregister chan *websocket.Conn
broadcast map[string]chan dalcommon.OutgoingSubmissionData
connectionCount map[string]int
configs map[string]types.Config
clients map[*websocket.Conn]map[string]bool
register chan *websocket.Conn
unregister chan *websocket.Conn
broadcast map[string]chan dalcommon.OutgoingSubmissionData

mu sync.RWMutex
}
Expand Down
29 changes: 2 additions & 27 deletions node/pkg/dal/api/websocketcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
"context"
"strings"
"time"

"bisonai.com/orakl/node/pkg/dal/utils/stats"
"github.com/gofiber/contrib/websocket"
Expand Down Expand Up @@ -32,13 +31,6 @@ func HandleWebsocket(conn *websocket.Conn) {

ctx := *ctxPointer
apiKey := conn.Headers("X-Api-Key")
c.mu.RLock()
if c.connectionCount[apiKey] >= 10 {
log.Error().Str("Player", "controller").Msg("too many connections")
c.mu.RUnlock()
return
}
c.mu.RUnlock()

c.register <- conn

Expand All @@ -59,38 +51,21 @@ func HandleWebsocket(conn *websocket.Conn) {
log.Info().Str("Player", "controller").Int32("id", id).Msg("updated websocket connection")
}()

subscriptionTimer := time.NewTimer(5 * time.Second)
defer subscriptionTimer.Stop()

go func(ctx context.Context, conn *websocket.Conn, subscriptionTimer *time.Timer, c *Controller) {
for {
select {
case <-subscriptionTimer.C:
conn.CloseHandler()(websocket.ClosePolicyViolation, "no subscription in 5 seconds")
return
case <-ctx.Done():
conn.CloseHandler()(websocket.CloseNormalClosure, "normal closure")
return
}
}
}(ctx, conn, subscriptionTimer, c)

for {
if err := handleMessage(ctx, conn, c, id, subscriptionTimer); err != nil {
if err := handleMessage(ctx, conn, c, id); err != nil {
log.Error().Str("Player", "controller").Err(err).Msg("failed to handle message")
return
}
}
}

func handleMessage(ctx context.Context, conn *websocket.Conn, c *Controller, id int32, subscriptionTimer *time.Timer) error {
func handleMessage(ctx context.Context, conn *websocket.Conn, c *Controller, id int32) error {
var msg Subscription
if err := conn.ReadJSON(&msg); err != nil {
return err
}

if msg.Method == "SUBSCRIBE" {
subscriptionTimer.Stop()
c.mu.Lock()
defer c.mu.Unlock()
if c.clients[conn] == nil {
Expand Down

0 comments on commit aa40742

Please sign in to comment.