Skip to content
This repository has been archived by the owner on Nov 19, 2024. It is now read-only.

Commit

Permalink
add: mutexes to avoid concurrents r/w
Browse files Browse the repository at this point in the history
  • Loading branch information
ggmolly committed Jun 15, 2024
1 parent 9e01b75 commit 24875fe
Showing 1 changed file with 45 additions and 14 deletions.
59 changes: 45 additions & 14 deletions connection/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"sync"

"github.com/ggmolly/belfast/consts"
"github.com/ggmolly/belfast/debug"
"github.com/ggmolly/belfast/logger"
"github.com/ggmolly/belfast/orm"
Expand All @@ -24,8 +25,13 @@ type Server struct {
SocketFD int
EpollFD int
Dispatcher ServerDispatcher
rooms map[uint32][]*Client
Region string

// Maps & mutexes
roomsMutex sync.RWMutex
rooms map[uint32][]*Client // Game chat rooms
clientsMutex sync.RWMutex
clients map[uint32]*Client // Socket hash -> Client
}

var (
Expand All @@ -39,36 +45,43 @@ func (server *Server) GetClient(conn *net.Conn) (*Client, error) {
client.Port = (*conn).RemoteAddr().(*net.TCPAddr).Port
client.Connection = conn
client.Server = server
for _, c := range fmt.Sprintf("%s:%d", client.IP, client.Port) {
client.Hash += uint32(c)
}
return &client, err
}

func (server *Server) AddClient(client *Client) {
logger.LogEvent("Server", "hewwo", fmt.Sprintf("new connection from %s:%d", client.IP, client.Port), logger.LOG_LEVEL_DEBUG)
logger.LogEvent("Server", "Hello", fmt.Sprintf("new connection from %s:%d", client.IP, client.Port), logger.LOG_LEVEL_DEBUG)
client.Server.clientsMutex.Lock()
defer client.Server.clientsMutex.Unlock()
server.clients[client.Hash] = client
}

func (server *Server) RemoveClient(client *Client) {
logger.LogEvent("Server", "cya", fmt.Sprintf("%s:%d", client.IP, client.Port), logger.LOG_LEVEL_DEBUG)
client.Server.clientsMutex.Lock()
defer client.Server.clientsMutex.Unlock()
logger.LogEvent("Server", "Goodbye", fmt.Sprintf("%s:%d", client.IP, client.Port), logger.LOG_LEVEL_DEBUG)
(*client.Connection).Close()
delete(server.clients, client.Hash)
}

func handleConnection(conn net.Conn, wg *sync.WaitGroup, server *Server) {
defer wg.Done()
func handleConnection(conn net.Conn, server *Server) {
logger.LogEvent("Server", "TEST", "Goroutine started", logger.LOG_LEVEL_WARN)
defer conn.Close()

defer logger.LogEvent("Server", "TEST", "Goroutine ended", logger.LOG_LEVEL_WARN)
// Add the client to the list
client, err := server.GetClient(&conn)

if err != nil {
logger.LogEvent("Server", "Handler", fmt.Sprintf("client %s -- error: %v", conn.RemoteAddr(), err), logger.LOG_LEVEL_ERROR)
conn.Close()
server.RemoveClient(client)
return
}

if !client.IP.IsPrivate() {
logger.LogEvent("Server", "Handler", fmt.Sprintf("client %s -- not in a private range", conn.RemoteAddr()), logger.LOG_LEVEL_ERROR)
conn.Close()
server.RemoveClient(client)
return
}

Expand All @@ -95,7 +108,7 @@ func handleConnection(conn net.Conn, wg *sync.WaitGroup, server *Server) {
// these two bytes are the length of a message
size := int(packerBuffer[0])<<8 | int(packerBuffer[1]) + 2 // take into account the 2 bytes for the size
if totalBytes >= size {
// We have a full message, slice it and send it to the dispatcher
// Slice the packerBuffer to get the message and send it to the dispatcher
message := packerBuffer[:size]
server.Dispatcher(&message, client, size)
// Remove the message from the packerBuffer and shift the rest of the buffer
Expand All @@ -117,18 +130,14 @@ func (server *Server) Run() error {
defer listener.Close()
logger.LogEvent("Server", "Run", fmt.Sprintf("listening on %s:%d", server.BindAddress, server.Port), logger.LOG_LEVEL_INFO)

var wg sync.WaitGroup
for {
conn, err := listener.Accept()
if err != nil {
logger.LogEvent("Server", "Run", fmt.Sprintf("error accepting: %v", err), logger.LOG_LEVEL_ERROR)
continue
}
wg.Add(1)
go handleConnection(conn, &wg, server)
go handleConnection(conn, server)
}
wg.Wait()
return nil
}

func NewServer(bindAddress string, port int, dispatcher ServerDispatcher) *Server {
Expand All @@ -137,16 +146,34 @@ func NewServer(bindAddress string, port int, dispatcher ServerDispatcher) *Serve
Port: port,
Dispatcher: dispatcher,
Region: os.Getenv("AL_REGION"),
clients: make(map[uint32]*Client),
rooms: make(map[uint32][]*Client),
}
}

// Sends SC_10999 (disconnected from server) message to every connected clients, reasons are defined in consts/disconnect_reasons.go
func (server *Server) DisconnectAll(reason uint8) {
server.clientsMutex.Lock()
defer server.clientsMutex.Unlock()
for _, client := range server.clients {
logger.LogEvent("Server", "Disconnect", fmt.Sprintf("disconnecting %s:%d -> %s", client.IP, client.Port, consts.ResolveReason(reason)), logger.LOG_LEVEL_DEBUG)
client.Disconnect(reason)
client.Flush()
(*client.Connection).Close()
delete(server.clients, client.Hash)
}
}

// Chat room management
func (server *Server) JoinRoom(roomID uint32, client *Client) {
server.roomsMutex.Lock()
defer server.roomsMutex.Unlock()
server.rooms[roomID] = append(server.rooms[roomID], client)
}

func (server *Server) LeaveRoom(roomID uint32, client *Client) {
server.roomsMutex.Lock()
defer server.roomsMutex.Unlock()
for i, c := range server.rooms[roomID] {
if c == client {
server.rooms[roomID] = append(server.rooms[roomID][:i], server.rooms[roomID][i+1:]...)
Expand All @@ -156,6 +183,8 @@ func (server *Server) LeaveRoom(roomID uint32, client *Client) {
}

func (server *Server) ChangeRoom(oldRoomID uint32, newRoomID uint32, client *Client) {
server.roomsMutex.Lock()
defer server.roomsMutex.Unlock()
for i, c := range server.rooms[oldRoomID] {
if c == client {
server.rooms[oldRoomID] = append(server.rooms[oldRoomID][:i], server.rooms[oldRoomID][i+1:]...)
Expand All @@ -175,6 +204,8 @@ func (server *Server) SendMessage(sender *Client, message orm.Message) {
Type: proto.Uint32(orm.MSG_TYPE_NORMAL),
Content: proto.String(message.Content),
}
server.roomsMutex.RLock()
defer server.roomsMutex.RUnlock()
for _, client := range server.rooms[message.RoomID] {
client.SendMessage(50101, &msgPacket)
}
Expand Down

0 comments on commit 24875fe

Please sign in to comment.