Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Jul 25, 2024
1 parent 1a85c10 commit 0628d5d
Show file tree
Hide file tree
Showing 17 changed files with 316 additions and 1,020 deletions.
3 changes: 1 addition & 2 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
internalDiscovery "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery"
discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
internalQuery "github.com/ydb-platform/ydb-go-sdk/v3/internal/query"
queryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
internalRatelimiter "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter"
Expand Down Expand Up @@ -488,7 +487,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {

d.discovery = xsync.OnceValue(func() (*internalDiscovery.Client, error) {
return internalDiscovery.New(xcontext.ValueOnly(ctx),
d.pool.Get(endpoint.New(d.config.Endpoint())),
d.balancer,
discoveryConfig.New(
append(
// prepend common params from root config
Expand Down
166 changes: 90 additions & 76 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/cluster"
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
Expand All @@ -26,8 +27,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var ErrNoEndpoints = xerrors.Wrap(fmt.Errorf("no endpoints"))

type discoveryClient interface {
closer.Closer

Expand All @@ -40,9 +39,12 @@ type Balancer struct {
pool *conn.Pool
discoveryClient discoveryClient
discoveryRepeater repeater.Repeater
localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error)

connectionsState atomic.Pointer[connectionsState]
cluster atomic.Pointer[cluster.Cluster]
conns xsync.Map[endpoint.Endpoint, conn.Conn]
banned xsync.Set[endpoint.Endpoint]

localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error)

mu xsync.RWMutex
onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info)
Expand Down Expand Up @@ -124,19 +126,48 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
}

func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoint.Endpoint, localDC string) {
var (
onDone = trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(), &ctx,
stack.FunctionID(
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"),
b.config.DetectLocalDC,
)
previous = b.connections().All()
onDone := trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(), &ctx,
stack.FunctionID(
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"),
b.config.DetectLocalDC,
)

state := cluster.New(newest,
cluster.WithFilter(func(e endpoint.Info) bool {
if b.config.Filter == nil {
return true
}

return b.config.Filter.Allow(balancerConfig.Info{SelfLocation: localDC}, e)
}),
cluster.WithFallback(b.config.AllowFallback),
)

previous := b.cluster.Swap(state)

_, added, dropped := xslices.Diff(previous.All(), newest, func(lhs, rhs endpoint.Endpoint) int {
return strings.Compare(lhs.Address(), rhs.Address())
})

for _, e := range dropped {
c, ok := b.conns.Extract(e)
if !ok {
panic("wrong balancer state")
}
b.pool.Put(ctx, c)
}

for _, e := range added {
cc, err := b.pool.Get(ctx, e)
if err != nil {
b.banned.Add(e)
} else {
b.conns.Set(e, cc)
}
}

defer func() {
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
return strings.Compare(lhs.Address(), rhs.Address())
})
onDone(
xslices.Transform(newest, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
xslices.Transform(added, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
Expand All @@ -145,25 +176,13 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
)
}()

connections := endpointsToConnections(b.pool, newest)
for _, c := range connections {
b.pool.Allow(ctx, c)
c.Endpoint().Touch()
}

info := balancerConfig.Info{SelfLocation: localDC}
state := newConnectionsState(connections, b.config.Filter, info, b.config.AllowFallback)

endpointsInfo := make([]endpoint.Info, len(newest))
for i, e := range newest {
endpointsInfo[i] = e
}

b.connectionsState.Store(state)
endpoints := xslices.Transform(newest, func(e endpoint.Endpoint) endpoint.Info {
return e
})

b.mu.WithLock(func() {
for _, onApplyDiscoveredEndpoints := range b.onApplyDiscoveredEndpoints {
onApplyDiscoveredEndpoints(ctx, endpointsInfo)
onApplyDiscoveredEndpoints(ctx, endpoints)
}
})
}
Expand Down Expand Up @@ -212,18 +231,20 @@ func New(
onDone(finalErr)
}()

cc, err := pool.Get(ctx, endpoint.New(driverConfig.Endpoint()))
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

b = &Balancer{
driverConfig: driverConfig,
pool: pool,
discoveryClient: internalDiscovery.New(ctx, pool.Get(
endpoint.New(driverConfig.Endpoint()),
), discoveryConfig),
config: balancerConfig.Config{},
driverConfig: driverConfig,
pool: pool,
discoveryClient: internalDiscovery.New(ctx, cc, discoveryConfig),
localDCDetector: detectLocalDC,
}

if config := driverConfig.Balancer(); config == nil {
b.config = balancerConfig.Config{}
} else {
if config := driverConfig.Balancer(); config != nil {
b.config = *config
}

Expand Down Expand Up @@ -289,10 +310,10 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
defer func() {
if err == nil {
if cc.GetState() == conn.Banned {
b.pool.Allow(ctx, cc)
b.banned.Remove(cc.Endpoint())
}
} else if xerrors.MustPessimizeEndpoint(err, b.driverConfig.ExcludeGRPCCodesForPessimization()...) {
b.pool.Ban(ctx, cc, err)
} else if conn.IsBadConn(err, b.driverConfig.ExcludeGRPCCodesForPessimization()...) {
b.banned.Add(cc.Endpoint())
}
}()

Expand All @@ -319,53 +340,46 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
return nil
}

func (b *Balancer) connections() *connectionsState {
return b.connectionsState.Load()
}

func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
onDone := trace.DriverOnBalancerChooseEndpoint(
b.driverConfig.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).getConn"),
var (
onDone = trace.DriverOnBalancerChooseEndpoint(
b.driverConfig.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).getConn"),
)
state = b.cluster.Load()
)

defer func() {
b.cluster.Store(state)

if b.discoveryRepeater != nil {
b.discoveryRepeater.Force()
}

if err == nil {
onDone(c.Endpoint(), nil)
} else {
onDone(nil, err)
}
}()

if err = ctx.Err(); err != nil {
return nil, xerrors.WithStackTrace(err)
}

var (
state = b.connections()
failedCount int
)

defer func() {
if failedCount*2 > state.PreferredCount() && b.discoveryRepeater != nil {
b.discoveryRepeater.Force()
for attempts := 1; ; attempts++ {
if err = ctx.Err(); err != nil {
return nil, xerrors.WithStackTrace(err)
}
}()

c, failedCount = state.GetConnection(ctx)
if c == nil {
return nil, xerrors.WithStackTrace(
fmt.Errorf("%w: cannot get connection from Balancer after %d attempts", ErrNoEndpoints, failedCount),
)
}
e, err := state.Next(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(
fmt.Errorf("%w: cannot get connection from Balancer after %d attempts", cluster.ErrNoEndpoints, attempts),
)
}

return c, nil
}
cc, err := b.pool.Get(ctx, e)
if err == nil {
return cc, nil
}

func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) []conn.Conn {
conns := make([]conn.Conn, 0, len(endpoints))
for _, e := range endpoints {
conns = append(conns, p.Get(e))
b.banned.Add(e)
}

return conns
}
Loading

0 comments on commit 0628d5d

Please sign in to comment.