Skip to content

Commit

Permalink
feat: support async router (#33)
Browse files Browse the repository at this point in the history
* feat: support async router in order to call HandlerFunc in a goroutine or not

* docs: fix document [skip ci]
  • Loading branch information
DarthPestilane authored Jan 10, 2022
1 parent 4199499 commit 75d4ad1
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 22 deletions.
9 changes: 8 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Server struct {
accepting chan struct{}
stopped chan struct{}
writeAttemptTimes int
asyncRouter bool
}

// ServerOption is the option for Server.
Expand All @@ -44,14 +45,18 @@ type ServerOption struct {
SocketSendDelay bool // sets the socket delay or not.
ReadTimeout time.Duration // sets the timeout for connection read.
WriteTimeout time.Duration // sets the timeout for connection write.
Packer Packer // packs and unpacks packet payload, default packer is the packet.DefaultPacker.
Packer Packer // packs and unpacks packet payload, default packer is the DefaultPacker.
Codec Codec // encodes and decodes the message data, can be nil.
RespQueueSize int // sets the response channel size of session, DefaultRespQueueSize will be used if < 0.
DoNotPrintRoutes bool // whether to print registered route handlers to the console.

// WriteAttemptTimes sets the max attempt times for packet writing in each session.
// The DefaultWriteAttemptTimes will be used if <= 0.
WriteAttemptTimes int

// AsyncRouter represents whether to execute a route HandlerFunc of each session in a goroutine.
// true means execute in a goroutine.
AsyncRouter bool
}

// ErrServerStopped is returned when server stopped.
Expand Down Expand Up @@ -88,6 +93,7 @@ func NewServer(opt *ServerOption) *Server {
accepting: make(chan struct{}),
stopped: make(chan struct{}),
writeAttemptTimes: opt.WriteAttemptTimes,
asyncRouter: opt.AsyncRouter,
}
}

Expand Down Expand Up @@ -161,6 +167,7 @@ func (s *Server) handleConn(conn net.Conn) {
Packer: s.Packer,
Codec: s.Codec,
respQueueSize: s.respQueueSize,
asyncRouter: s.asyncRouter,
})
if s.OnSessionCreate != nil {
go s.OnSessionCreate(sess)
Expand Down
1 change: 1 addition & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func TestServer_handleConn(t *testing.T) {
Codec: codec,
Packer: packer,
RespQueueSize: -1,
AsyncRouter: true,
})

// hooks
Expand Down
51 changes: 30 additions & 21 deletions session.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package easytcp

import (
"github.com/DarthPestilane/easytcp/message"
"github.com/google/uuid"
"net"
"sync"
Expand Down Expand Up @@ -29,21 +30,23 @@ type Session interface {
}

type session struct {
id interface{} // session's ID.
conn net.Conn // tcp connection
closed chan struct{} // to close()
closeOnce sync.Once // ensure one session only close once
respQueue chan Context // response queue channel, pushed in Send() and popped in writeOutbound()
packer Packer // to pack and unpack message
codec Codec // encode/decode message data
ctxPool sync.Pool // router context pool
id interface{} // session's ID.
conn net.Conn // tcp connection
closed chan struct{} // to close()
closeOnce sync.Once // ensure one session only close once
respQueue chan Context // response queue channel, pushed in Send() and popped in writeOutbound()
packer Packer // to pack and unpack message
codec Codec // encode/decode message data
ctxPool sync.Pool // router context pool
asyncRouter bool // calls router HandlerFunc in a goroutine if false
}

// sessionOption is the extra options for session.
type sessionOption struct {
Packer Packer
Codec Codec
respQueueSize int
asyncRouter bool
}

// newSession creates a new session.
Expand All @@ -52,13 +55,14 @@ type sessionOption struct {
// Returns a session pointer.
func newSession(conn net.Conn, opt *sessionOption) *session {
return &session{
id: uuid.NewString(), // use uuid as default
conn: conn,
closed: make(chan struct{}),
respQueue: make(chan Context, opt.respQueueSize),
packer: opt.Packer,
codec: opt.Codec,
ctxPool: sync.Pool{New: func() interface{} { return NewContext() }},
id: uuid.NewString(), // use uuid as default
conn: conn,
closed: make(chan struct{}),
respQueue: make(chan Context, opt.respQueueSize),
packer: opt.Packer,
codec: opt.Codec,
ctxPool: sync.Pool{New: func() interface{} { return NewContext() }},
asyncRouter: opt.asyncRouter,
}
}

Expand Down Expand Up @@ -130,17 +134,22 @@ func (s *session) readInbound(router *Router, timeout time.Duration) {
continue
}

// don't block the loop.
go func() {
ctx := s.AllocateContext().SetRequestMessage(reqEntry)
router.handleRequest(ctx)
s.Send(ctx)
}()
if s.asyncRouter {
go s.handleReq(router, reqEntry)
} else {
s.handleReq(router, reqEntry)
}
}
Log.Tracef("session %s readInbound exit because of error", s.id)
s.Close()
}

func (s *session) handleReq(router *Router, reqEntry *message.Entry) {
ctx := s.AllocateContext().SetRequestMessage(reqEntry)
router.handleRequest(ctx)
s.Send(ctx)
}

// writeOutbound fetches message from respQueue channel and writes to TCP connection in a loop.
// Parameter writeTimeout specified the connection writing timeout.
// The loop breaks if errors occurred, or the session is closed.
Expand Down

0 comments on commit 75d4ad1

Please sign in to comment.