Skip to content

Commit

Permalink
feat:add params (keepAlive & maxsession & backoff & other)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjy-dv committed Jan 12, 2024
1 parent b6f5c90 commit b51f600
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 8 deletions.
52 changes: 45 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
pb "github.com/sjy-dv/bridger/grpc/protocol/v0"
"github.com/vmihailenco/msgpack/v5"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
)

type bridgerAgent struct {
Expand Down Expand Up @@ -56,6 +58,12 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent {
opt.MaxChannelSize))
return opt.MaxChannelSize
}()
agents.maxsessions = func() int32 {
if opt.MaxSession == 0 {
return 100
}
return opt.MaxSession
}()
agents.minpoolsize = func() int {
if opt.MinChannelSize == 0 {
localLogging.WithField("action", "bridger-config-max-channel-size").
Expand All @@ -69,7 +77,21 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent {
}()
agents.addr = opt.Addr
agents.pool = make([]*connectionpool, opt.MinChannelSize)
clientOpts := []grpc.DialOption{}
clientOpts := []grpc.DialOption{
grpc.WithBlock(),
grpc.FailOnNonTempDialError(true),
grpc.WithReturnConnectionError(),
grpc.WithDisableRetry(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 100 * time.Millisecond,
Multiplier: 1.6,
Jitter: 0.2,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: time.Millisecond,
}),
}
clientOpts = append(clientOpts, []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
Expand All @@ -78,8 +100,24 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent {
),
}...)
if opt.ClientInterceptor != nil {
localLogging.WithField("action", "register-interceptor")
clientOpts = append(clientOpts, grpc.WithUnaryInterceptor(opt.ClientInterceptor))
}
if opt.KeepAliveTimeout != 0 && opt.KeepAliveTime != 0 {
localLogging.WithField("action", "configure-keepalive").
Info("The keepalive time should be the same as the server.")
clientOpts = append(clientOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: opt.KeepAliveTime,
Timeout: opt.KeepAliveTimeout,
PermitWithoutStream: true,
}))
} else {
clientOpts = append(clientOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: options.DefaultKeepAlive,
Timeout: options.DefaultKeepAliveTimeout,
PermitWithoutStream: true,
}))
}
for i := 0; i < opt.MinChannelSize; i++ {
status := true
conn, err := grpc.Dial(agents.addr, clientOpts...)
Expand Down Expand Up @@ -108,24 +146,24 @@ func RegisterBridgerClient(opt *options.Options) *bridgerAgent {
return bridger
}

func (agent *bridgerAgent) Dispatch(domain string, v interface{}, callOPtions ...CallOptions) ([]byte, error) {
func (agent *bridgerAgent) Dispatch(domain string, v interface{}, callOptions ...CallOptions) ([]byte, error) {
data, err := marshal(v)
if err != nil {
return nil, err
}
ctx := context.Background()
if len(callOPtions) != 0 {
if callOPtions[0].MetadataHeader != nil {
md := callOPtions[0].MetadataHeader
if len(callOptions) != 0 {
if callOptions[0].MetadataHeader != nil {
md := callOptions[0].MetadataHeader
for k, v := range md {
ctx = appendMetaData(ctx, k, v)
}
}
}
var cancel context.CancelFunc
if agent.deadline != nil {
if len(callOPtions) > 0 && callOPtions[0].Ctx != nil {
ctx = callOPtions[0].Ctx
if len(callOptions) > 0 && callOptions[0].Ctx != nil {
ctx = callOptions[0].Ctx
} else {
ctx, cancel = context.WithTimeout(ctx, *agent.deadline)
defer cancel()
Expand Down
7 changes: 7 additions & 0 deletions client/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type Options struct {
method string,
req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error
KeepAliveTimeout time.Duration
KeepAliveTime time.Duration
MaxSession int32
}

const DefaultMsgSize = 104858000 // 10mb
const DialTimeout = 60 * time.Second
const DefaultKeepAliveTimeout = 60 * time.Second
const DefaultKeepAlive = 60 * time.Second
const DefaultMaxSession = 100
3 changes: 2 additions & 1 deletion client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type gRpcClientPool struct {
minpoolsize int
maxpoolsize int
poolSize *atomic.Int32
maxsessions int32
}

type connectionpool struct {
Expand Down Expand Up @@ -50,7 +51,7 @@ func (proxy *gRpcClientPool) getConnection() *connectionpool {
proxy.mu.RLock()
defer proxy.mu.RUnlock()
for i, wrapper := range proxy.pool {
if wrapper.sessions.Load() <= 100 && wrapper.status {
if wrapper.sessions.Load() <= proxy.maxsessions && wrapper.status {
proxy.pool[i].lastCall = time.Now()
return proxy.pool[i]
}
Expand Down
12 changes: 12 additions & 0 deletions server/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"runtime"

_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"

grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
Expand Down Expand Up @@ -80,6 +81,17 @@ func (b *bridger) RegisterBridgerServer(opt *options.Options) error {
b.Logger.WithField("action", "grpc_configure_server_interceptor")
serverOptions = append(serverOptions, grpc.UnaryInterceptor(opt.ServerInterceptor))
}
if opt.KeepAliveTimeout != 0 && opt.KeepAliveTime != 0 {
serverOptions = append(serverOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
Time: opt.KeepAliveTime,
Timeout: opt.KeepAliveTimeout,
}))
} else {
serverOptions = append(serverOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
Time: options.DefaultKeepAlive,
Timeout: options.DefaultKeepAliveTimeout,
}))
}
dispatch := rpcDispatcher{}
dispatch.DispatchService = &dispatchService{dispatch}
grpcServer := grpc.NewServer(serverOptions...)
Expand Down
5 changes: 5 additions & 0 deletions server/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package options

import (
"context"
"time"

"google.golang.org/grpc"
)
Expand All @@ -16,6 +17,8 @@ type Options struct {
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error)
KeepAliveTimeout time.Duration
KeepAliveTime time.Duration
}

const (
Expand All @@ -31,3 +34,5 @@ const (
)

const DefaultMsgSize = 104858000 // 10mb
const DefaultKeepAliveTimeout = 60 * time.Second
const DefaultKeepAlive = 60 * time.Second

0 comments on commit b51f600

Please sign in to comment.