Skip to content

Commit

Permalink
fix:keepalive many pings
Browse files Browse the repository at this point in the history
  • Loading branch information
sjy-dv committed Jan 12, 2024
1 parent b51f600 commit d7d6618
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 30 deletions.
42 changes: 29 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"crypto/tls"
"fmt"
"sync/atomic"
"time"
Expand All @@ -13,17 +14,20 @@ import (
"github.com/vmihailenco/msgpack/v5"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
)

type bridgerAgent struct {
type BridgerAgent struct {
*gRpcClientPool
deadline *time.Duration
}

func RegisterBridgerClient(opt *options.Options) *bridgerAgent {
var reuseOpts = []grpc.DialOption{}

func RegisterBridgerClient(opt *options.Options) *BridgerAgent {
localLogging := logrus.New()
if opt.MinChannelSize > opt.MaxChannelSize {
panic("min channel size can't exceed max channel size")
Expand All @@ -44,7 +48,7 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent {
}
return options.DefaultMsgSize
}()
bridger := &bridgerAgent{}
bridger := &BridgerAgent{}
agents := &gRpcClientPool{}
agents.poolSize = &atomic.Int32{}
agents.maxpoolsize = func() int {
Expand Down Expand Up @@ -91,14 +95,18 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent {
},
MinConnectTimeout: time.Millisecond,
}),
}
clientOpts = append(clientOpts, []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
}...)
}
if opt.Credentials {
clientOpts = append(clientOpts, grpc.WithTransportCredentials(
credentials.NewTLS(&tls.Config{})))
} else {
clientOpts = append(clientOpts, grpc.WithTransportCredentials(
insecure.NewCredentials()))
}
if opt.ClientInterceptor != nil {
localLogging.WithField("action", "register-interceptor")
clientOpts = append(clientOpts, grpc.WithUnaryInterceptor(opt.ClientInterceptor))
Expand All @@ -120,12 +128,14 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent {
}
for i := 0; i < opt.MinChannelSize; i++ {
status := true
conn, err := grpc.Dial(agents.addr, clientOpts...)
dialContext, cancel := context.WithTimeout(context.Background(), options.DialTimeout)
conn, err := grpc.DialContext(dialContext, agents.addr, clientOpts...)
if err != nil {
conn = nil
status = false
localLogging.WithField("action", "bridger-established").
Info(fmt.Sprintf("bridger-connection-%d is not established", i+1))
cancel()
return nil
}
if conn != nil {
Expand All @@ -138,15 +148,17 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent {
status: status,
sessions: &atomic.Int32{},
}
cancel()
}
bridger = &bridgerAgent{
reuseOpts = clientOpts
bridger = &BridgerAgent{
agents,
&opt.Timeout,
}
return bridger
}

func (agent *bridgerAgent) Dispatch(domain string, v interface{}, callOptions ...CallOptions) ([]byte, error) {
func (agent *BridgerAgent) Dispatch(domain string, v interface{}, callOptions ...CallOptions) ([]byte, error) {
data, err := marshal(v)
if err != nil {
return nil, err
Expand All @@ -170,8 +182,12 @@ func (agent *bridgerAgent) Dispatch(domain string, v interface{}, callOptions ..
}
}

val, err := pb.NewBridgerClient(
agent.getConnection().connection).Dispatch(
cp := agent.getPool()
if cp == nil {
return nil, errors.New("connection is not connected")
}
defer agent.rollbackConnection(cp)
val, err := pb.NewBridgerClient(cp.connection).Dispatch(
ctx,
&pb.PayloadEmitter{
Payload: data,
Expand All @@ -187,7 +203,7 @@ func (agent *bridgerAgent) Dispatch(domain string, v interface{}, callOptions ..
return val.GetPayload(), nil
}

func (agents *bridgerAgent) Close() {
func (agents *BridgerAgent) Close() {
for _, agent := range agents.pool {
agent.connection.Close()
}
Expand Down
1 change: 1 addition & 0 deletions client/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Options struct {
KeepAliveTimeout time.Duration
KeepAliveTime time.Duration
MaxSession int32
Credentials bool
}

const DefaultMsgSize = 104858000 // 10mb
Expand Down
17 changes: 7 additions & 10 deletions client/pool.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package client

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/sjy-dv/bridger/client/options"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type gRpcClientPool struct {
Expand Down Expand Up @@ -59,27 +60,23 @@ func (proxy *gRpcClientPool) getConnection() *connectionpool {
return nil
}

func (proxy *gRpcClientPool) establishedConnection() *grpc.ClientConn {
conn, err := grpc.Dial(proxy.addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil
}
return conn
}

func (proxy *gRpcClientPool) addConnection() *connectionpool {
proxy.mu.Lock()
defer proxy.mu.Unlock()
connection := &connectionpool{}
conn, err := grpc.Dial(proxy.addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
dialContext, cancel := context.WithTimeout(context.Background(), options.DialTimeout)
conn, err := grpc.DialContext(dialContext, proxy.addr,
reuseOpts...)
if err != nil {
cancel()
return nil
}
connection.connection = conn
connection.status = true
connection.lastCall = time.Now()
proxy.pool = append(proxy.pool, connection)
proxy.poolSize.Add(1)
cancel()
return connection
}

Expand Down
53 changes: 53 additions & 0 deletions examples/long/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"log"
"time"

"github.com/sjy-dv/bridger/client"
"github.com/sjy-dv/bridger/client/options"
)

func main() {
/**
default value
if you want to singleton instance,
min&max channel size should be set 1
*/
bridgerClient := client.RegisterBridgerClient(&options.Options{
Addr: "127.0.0.1:50051",
MinChannelSize: 1,
MaxChannelSize: 4,
Timeout: time.Duration(time.Second * 5),
})
defer bridgerClient.Close()
type req struct {
Msg string
}
val, err := bridgerClient.Dispatch("/greetings", &req{Msg: "Hello, Dispatcher"})
if err != nil {
panic(err)
}
response := &req{}
err = client.Unmarshal(val, response)
if err != nil {
panic(err)
}
log.Println("First Message : ", response.Msg)
time.Sleep(time.Minute * 5)
header := client.MetadataHeader{}
header["name"] = "gopher"
val, err = bridgerClient.Dispatch("/greetings/withname", &req{Msg: "I'm gopher"}, client.CallOptions{
MetadataHeader: header,
})
if err != nil {
panic(err)
}
response = &req{}
err = client.Unmarshal(val, response)
if err != nil {
panic(err)
}
log.Println("Second Message : ", response.Msg)
log.Println("Connection is Alive")
}
52 changes: 52 additions & 0 deletions examples/long/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"github.com/sjy-dv/bridger/server"
"github.com/sjy-dv/bridger/server/dispatcher"
"github.com/sjy-dv/bridger/server/options"
)

func main() {
bridger := server.New()

bridger.Register("/greetings", greetings)
bridger.Register("/greetings/withname",
greetingsWithHeaderName,
"is using metadata api")
bridger.RegisterBridgerServer(&options.Options{
Port: 50051,
ChainUnaryInterceptorLogger: true,
ChainStreamInterceptorLogger: true,
})
}

func greetings(dtx dispatcher.DispatchContext) *dispatcher.ResponseWriter {
var (
req = struct {
Msg string
}{}
err error
)
err = dtx.Bind(&req)
if err != nil {
return dtx.Error(err)
}
req.Msg = req.Msg + "\n" + "Me too.."
return dtx.Reply(&req)
}

func greetingsWithHeaderName(dtx dispatcher.DispatchContext) *dispatcher.ResponseWriter {
var (
req = struct {
Msg string
}{}
err error
)
err = dtx.Bind(&req)
if err != nil {
return dtx.Error(err)
}
name := dtx.GetMetadata("name")
req.Msg = "Hello " + name
return dtx.Reply(&req)
}
24 changes: 20 additions & 4 deletions server/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"runtime"
"time"

_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
Expand All @@ -18,19 +19,19 @@ import (
"google.golang.org/grpc"
)

type bridger struct {
type Bridger struct {
*dispatcher.DispatchAPI
}

func New() *bridger {
func New() *Bridger {
dispatcher.DMap = make(map[string]func(ctx dispatcher.DispatchContext) *dispatcher.ResponseWriter)
api := &dispatcher.DispatchAPI{}
return &bridger{
return &Bridger{
api.NewDispatch(),
}
}

func (b *bridger) RegisterBridgerServer(opt *options.Options) error {
func (b *Bridger) RegisterBridgerServer(opt *options.Options) error {
if opt.Port == 0 {
return errors.New("port must be specified")
}
Expand Down Expand Up @@ -81,7 +82,22 @@ func (b *bridger) RegisterBridgerServer(opt *options.Options) error {
b.Logger.WithField("action", "grpc_configure_server_interceptor")
serverOptions = append(serverOptions, grpc.UnaryInterceptor(opt.ServerInterceptor))
}
if opt.EnforcementPolicyMinTime != 0 {
b.Logger.WithField("action", "grpc_configure_keepalive_enforcement_policy").
Info("Be careful not to conflict with the client settings. Incorrect configuration can lead to the error [transport] Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII 'too_many_pings'")
serverOptions = append(serverOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: opt.EnforcementPolicyMinTime,
PermitWithoutStream: true,
}))
} else {
serverOptions = append(serverOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}))
}
if opt.KeepAliveTimeout != 0 && opt.KeepAliveTime != 0 {
b.Logger.WithField("action", "grpc_configure_keepalive").
Info("The keepalive time should be the same as the clients.")
serverOptions = append(serverOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
Time: opt.KeepAliveTime,
Timeout: opt.KeepAliveTimeout,
Expand Down
8 changes: 5 additions & 3 deletions server/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ type Options struct {
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error)
KeepAliveTimeout time.Duration
KeepAliveTime time.Duration
KeepAliveTimeout time.Duration
KeepAliveTime time.Duration
EnforcementPolicyMinTime time.Duration
}

const (
Expand All @@ -34,5 +35,6 @@ const (
)

const DefaultMsgSize = 104858000 // 10mb
const DefaultKeepAliveTimeout = 60 * time.Second
const DefaultKeepAliveTimeout = 10 * time.Second
const DefaultKeepAlive = 60 * time.Second
const DefaultEnforcementPolicyMinTime = 5 * time.Second

0 comments on commit d7d6618

Please sign in to comment.