diff --git a/client/client.go b/client/client.go index dce0dd9..15088ea 100644 --- a/client/client.go +++ b/client/client.go @@ -2,6 +2,7 @@ package client import ( "context" + "crypto/tls" "fmt" "sync/atomic" "time" @@ -13,17 +14,20 @@ import ( "github.com/vmihailenco/msgpack/v5" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" _ "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" ) -type bridgerAgent struct { +type BridgerAgent struct { *gRpcClientPool deadline *time.Duration } -func RegisterBridgerClient(opt *options.Options) *bridgerAgent { +var reuseOpts = []grpc.DialOption{} + +func RegisterBridgerClient(opt *options.Options) *BridgerAgent { localLogging := logrus.New() if opt.MinChannelSize > opt.MaxChannelSize { panic("min channel size can't exceed max channel size") @@ -44,7 +48,7 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent { } return options.DefaultMsgSize }() - bridger := &bridgerAgent{} + bridger := &BridgerAgent{} agents := &gRpcClientPool{} agents.poolSize = &atomic.Int32{} agents.maxpoolsize = func() int { @@ -91,14 +95,18 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent { }, MinConnectTimeout: time.Millisecond, }), - } - clientOpts = append(clientOpts, []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize), ), - }...) + } + if opt.Credentials { + clientOpts = append(clientOpts, grpc.WithTransportCredentials( + credentials.NewTLS(&tls.Config{}))) + } else { + clientOpts = append(clientOpts, grpc.WithTransportCredentials( + insecure.NewCredentials())) + } if opt.ClientInterceptor != nil { localLogging.WithField("action", "register-interceptor") clientOpts = append(clientOpts, grpc.WithUnaryInterceptor(opt.ClientInterceptor)) @@ -120,12 +128,14 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent { } for i := 0; i < opt.MinChannelSize; i++ { status := true - conn, err := grpc.Dial(agents.addr, clientOpts...) + dialContext, cancel := context.WithTimeout(context.Background(), options.DialTimeout) + conn, err := grpc.DialContext(dialContext, agents.addr, clientOpts...) if err != nil { conn = nil status = false localLogging.WithField("action", "bridger-established"). Info(fmt.Sprintf("bridger-connection-%d is not established", i+1)) + cancel() return nil } if conn != nil { @@ -138,15 +148,17 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent { status: status, sessions: &atomic.Int32{}, } + cancel() } - bridger = &bridgerAgent{ + reuseOpts = clientOpts + bridger = &BridgerAgent{ agents, &opt.Timeout, } return bridger } -func (agent *bridgerAgent) Dispatch(domain string, v interface{}, callOptions ...CallOptions) ([]byte, error) { +func (agent *BridgerAgent) Dispatch(domain string, v interface{}, callOptions ...CallOptions) ([]byte, error) { data, err := marshal(v) if err != nil { return nil, err @@ -170,8 +182,12 @@ func (agent *bridgerAgent) Dispatch(domain string, v interface{}, callOptions .. } } - val, err := pb.NewBridgerClient( - agent.getConnection().connection).Dispatch( + cp := agent.getPool() + if cp == nil { + return nil, errors.New("connection is not connected") + } + defer agent.rollbackConnection(cp) + val, err := pb.NewBridgerClient(cp.connection).Dispatch( ctx, &pb.PayloadEmitter{ Payload: data, @@ -187,7 +203,7 @@ func (agent *bridgerAgent) Dispatch(domain string, v interface{}, callOptions .. return val.GetPayload(), nil } -func (agents *bridgerAgent) Close() { +func (agents *BridgerAgent) Close() { for _, agent := range agents.pool { agent.connection.Close() } diff --git a/client/options/options.go b/client/options/options.go index 1bf1688..e921f1a 100644 --- a/client/options/options.go +++ b/client/options/options.go @@ -21,6 +21,7 @@ type Options struct { KeepAliveTimeout time.Duration KeepAliveTime time.Duration MaxSession int32 + Credentials bool } const DefaultMsgSize = 104858000 // 10mb diff --git a/client/pool.go b/client/pool.go index c32c677..bfa30e0 100644 --- a/client/pool.go +++ b/client/pool.go @@ -1,12 +1,13 @@ package client import ( + "context" "sync" "sync/atomic" "time" + "github.com/sjy-dv/bridger/client/options" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type gRpcClientPool struct { @@ -59,20 +60,15 @@ func (proxy *gRpcClientPool) getConnection() *connectionpool { return nil } -func (proxy *gRpcClientPool) establishedConnection() *grpc.ClientConn { - conn, err := grpc.Dial(proxy.addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil - } - return conn -} - func (proxy *gRpcClientPool) addConnection() *connectionpool { proxy.mu.Lock() defer proxy.mu.Unlock() connection := &connectionpool{} - conn, err := grpc.Dial(proxy.addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + dialContext, cancel := context.WithTimeout(context.Background(), options.DialTimeout) + conn, err := grpc.DialContext(dialContext, proxy.addr, + reuseOpts...) if err != nil { + cancel() return nil } connection.connection = conn @@ -80,6 +76,7 @@ func (proxy *gRpcClientPool) addConnection() *connectionpool { connection.lastCall = time.Now() proxy.pool = append(proxy.pool, connection) proxy.poolSize.Add(1) + cancel() return connection } diff --git a/examples/long/client.go b/examples/long/client.go new file mode 100644 index 0000000..53b125a --- /dev/null +++ b/examples/long/client.go @@ -0,0 +1,53 @@ +package main + +import ( + "log" + "time" + + "github.com/sjy-dv/bridger/client" + "github.com/sjy-dv/bridger/client/options" +) + +func main() { + /** + default value + if you want to singleton instance, + min&max channel size should be set 1 + */ + bridgerClient := client.RegisterBridgerClient(&options.Options{ + Addr: "127.0.0.1:50051", + MinChannelSize: 1, + MaxChannelSize: 4, + Timeout: time.Duration(time.Second * 5), + }) + defer bridgerClient.Close() + type req struct { + Msg string + } + val, err := bridgerClient.Dispatch("/greetings", &req{Msg: "Hello, Dispatcher"}) + if err != nil { + panic(err) + } + response := &req{} + err = client.Unmarshal(val, response) + if err != nil { + panic(err) + } + log.Println("First Message : ", response.Msg) + time.Sleep(time.Minute * 5) + header := client.MetadataHeader{} + header["name"] = "gopher" + val, err = bridgerClient.Dispatch("/greetings/withname", &req{Msg: "I'm gopher"}, client.CallOptions{ + MetadataHeader: header, + }) + if err != nil { + panic(err) + } + response = &req{} + err = client.Unmarshal(val, response) + if err != nil { + panic(err) + } + log.Println("Second Message : ", response.Msg) + log.Println("Connection is Alive") +} diff --git a/examples/long/server.go b/examples/long/server.go new file mode 100644 index 0000000..f6ac940 --- /dev/null +++ b/examples/long/server.go @@ -0,0 +1,52 @@ +package main + +import ( + "github.com/sjy-dv/bridger/server" + "github.com/sjy-dv/bridger/server/dispatcher" + "github.com/sjy-dv/bridger/server/options" +) + +func main() { + bridger := server.New() + + bridger.Register("/greetings", greetings) + bridger.Register("/greetings/withname", + greetingsWithHeaderName, + "is using metadata api") + bridger.RegisterBridgerServer(&options.Options{ + Port: 50051, + ChainUnaryInterceptorLogger: true, + ChainStreamInterceptorLogger: true, + }) +} + +func greetings(dtx dispatcher.DispatchContext) *dispatcher.ResponseWriter { + var ( + req = struct { + Msg string + }{} + err error + ) + err = dtx.Bind(&req) + if err != nil { + return dtx.Error(err) + } + req.Msg = req.Msg + "\n" + "Me too.." + return dtx.Reply(&req) +} + +func greetingsWithHeaderName(dtx dispatcher.DispatchContext) *dispatcher.ResponseWriter { + var ( + req = struct { + Msg string + }{} + err error + ) + err = dtx.Bind(&req) + if err != nil { + return dtx.Error(err) + } + name := dtx.GetMetadata("name") + req.Msg = "Hello " + name + return dtx.Reply(&req) +} diff --git a/server/bootstrap.go b/server/bootstrap.go index 380285f..a80aa10 100644 --- a/server/bootstrap.go +++ b/server/bootstrap.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "runtime" + "time" _ "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" @@ -18,19 +19,19 @@ import ( "google.golang.org/grpc" ) -type bridger struct { +type Bridger struct { *dispatcher.DispatchAPI } -func New() *bridger { +func New() *Bridger { dispatcher.DMap = make(map[string]func(ctx dispatcher.DispatchContext) *dispatcher.ResponseWriter) api := &dispatcher.DispatchAPI{} - return &bridger{ + return &Bridger{ api.NewDispatch(), } } -func (b *bridger) RegisterBridgerServer(opt *options.Options) error { +func (b *Bridger) RegisterBridgerServer(opt *options.Options) error { if opt.Port == 0 { return errors.New("port must be specified") } @@ -81,7 +82,22 @@ func (b *bridger) RegisterBridgerServer(opt *options.Options) error { b.Logger.WithField("action", "grpc_configure_server_interceptor") serverOptions = append(serverOptions, grpc.UnaryInterceptor(opt.ServerInterceptor)) } + if opt.EnforcementPolicyMinTime != 0 { + b.Logger.WithField("action", "grpc_configure_keepalive_enforcement_policy"). + Info("Be careful not to conflict with the client settings. Incorrect configuration can lead to the error [transport] Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII 'too_many_pings'") + serverOptions = append(serverOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: opt.EnforcementPolicyMinTime, + PermitWithoutStream: true, + })) + } else { + serverOptions = append(serverOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 5 * time.Second, + PermitWithoutStream: true, + })) + } if opt.KeepAliveTimeout != 0 && opt.KeepAliveTime != 0 { + b.Logger.WithField("action", "grpc_configure_keepalive"). + Info("The keepalive time should be the same as the clients.") serverOptions = append(serverOptions, grpc.KeepaliveParams(keepalive.ServerParameters{ Time: opt.KeepAliveTime, Timeout: opt.KeepAliveTimeout, diff --git a/server/options/options.go b/server/options/options.go index 368eae3..7124e9f 100644 --- a/server/options/options.go +++ b/server/options/options.go @@ -17,8 +17,9 @@ type Options struct { req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) - KeepAliveTimeout time.Duration - KeepAliveTime time.Duration + KeepAliveTimeout time.Duration + KeepAliveTime time.Duration + EnforcementPolicyMinTime time.Duration } const ( @@ -34,5 +35,6 @@ const ( ) const DefaultMsgSize = 104858000 // 10mb -const DefaultKeepAliveTimeout = 60 * time.Second +const DefaultKeepAliveTimeout = 10 * time.Second const DefaultKeepAlive = 60 * time.Second +const DefaultEnforcementPolicyMinTime = 5 * time.Second