Skip to content

Commit

Permalink
Merge pull request #890 from ydb-platform/balancer-config-name
Browse files Browse the repository at this point in the history
balancer config name
  • Loading branch information
asmyasnikov authored Nov 9, 2023
2 parents 4e5ae05 + d485a07 commit 6373d76
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 75 deletions.
77 changes: 59 additions & 18 deletions balancers/balancers.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package balancers

import (
"sort"
"strings"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
)

// Deprecated: RoundRobin is RandomChoice now
Expand All @@ -22,14 +24,22 @@ func SingleConn() *balancerConfig.Config {
}
}

type filterLocalDC struct{}

func (filterLocalDC) Allow(info balancerConfig.Info, c conn.Conn) bool {
return c.Endpoint().Location() == info.SelfLocation
}

func (filterLocalDC) String() string {
return "LocalDC"
}

// PreferLocalDC creates balancer which use endpoints only in location such as initial endpoint location
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// PreferLocalDC balancer try to autodetect local DC from client side.
func PreferLocalDC(balancer *balancerConfig.Config) *balancerConfig.Config {
balancer.IsPreferConn = func(info balancerConfig.Info, c conn.Conn) bool {
return c.Endpoint().Location() == info.SelfLocation
}
balancer.DetectlocalDC = true
balancer.Filter = filterLocalDC{}
balancer.DetectLocalDC = true
return balancer
}

Expand All @@ -38,10 +48,38 @@ func PreferLocalDC(balancer *balancerConfig.Config) *balancerConfig.Config {
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.Config {
balancer = PreferLocalDC(balancer)
balancer.AllowFalback = true
balancer.AllowFallback = true
return balancer
}

type filterLocations []string

func (locations filterLocations) Allow(_ balancerConfig.Info, c conn.Conn) bool {
location := strings.ToUpper(c.Endpoint().Location())
for _, l := range locations {
if location == l {
return true
}
}
return false
}

func (locations filterLocations) String() string {
buffer := xstring.Buffer()
defer buffer.Free()

buffer.WriteString("Locations{")
for i, l := range locations {
if i != 0 {
buffer.WriteByte(',')
}
buffer.WriteString(l)
}
buffer.WriteByte('}')

return buffer.String()
}

// PreferLocations creates balancer which use endpoints only in selected locations (such as "ABC", "DEF", etc.)
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
func PreferLocations(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
Expand All @@ -51,15 +89,8 @@ func PreferLocations(balancer *balancerConfig.Config, locations ...string) *bala
for i := range locations {
locations[i] = strings.ToUpper(locations[i])
}
balancer.IsPreferConn = func(_ balancerConfig.Info, c conn.Conn) bool {
location := strings.ToUpper(c.Endpoint().Location())
for _, l := range locations {
if location == l {
return true
}
}
return false
}
sort.Strings(locations)
balancer.Filter = filterLocations(locations)
return balancer
}

Expand All @@ -68,7 +99,7 @@ func PreferLocations(balancer *balancerConfig.Config, locations ...string) *bala
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferLocationsWithFallback(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
balancer = PreferLocations(balancer, locations...)
balancer.AllowFalback = true
balancer.AllowFallback = true
return balancer
}

Expand All @@ -82,12 +113,22 @@ type Endpoint interface {
LocalDC() bool
}

type filterFunc func(info balancerConfig.Info, c conn.Conn) bool

func (p filterFunc) Allow(info balancerConfig.Info, c conn.Conn) bool {
return p(info, c)
}

func (p filterFunc) String() string {
return "Custom"
}

// Prefer creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer.IsPreferConn = func(_ balancerConfig.Info, c conn.Conn) bool {
balancer.Filter = filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
return filter(c.Endpoint())
}
})
return balancer
}

Expand All @@ -96,7 +137,7 @@ func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferWithFallback(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer = Prefer(balancer, filter)
balancer.AllowFalback = true
balancer.AllowFallback = true
return balancer
}

Expand Down
14 changes: 7 additions & 7 deletions balancers/balancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestPreferLocalDC(t *testing.T) {
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
}
rr := PreferLocalDC(RandomChoice())
require.False(t, rr.AllowFalback)
require.False(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
}

Expand All @@ -28,7 +28,7 @@ func TestPreferLocalDCWithFallBack(t *testing.T) {
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
}
rr := PreferLocalDCWithFallBack(RandomChoice())
require.True(t, rr.AllowFalback)
require.True(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
}

Expand All @@ -40,7 +40,7 @@ func TestPreferLocations(t *testing.T) {
}

rr := PreferLocations(RandomChoice(), "zero", "two")
require.False(t, rr.AllowFalback)
require.False(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
}

Expand All @@ -52,17 +52,17 @@ func TestPreferLocationsWithFallback(t *testing.T) {
}

rr := PreferLocationsWithFallback(RandomChoice(), "zero", "two")
require.True(t, rr.AllowFalback)
require.True(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
}

func applyPreferFilter(info balancerConfig.Info, b *balancerConfig.Config, conns []conn.Conn) []conn.Conn {
if b.IsPreferConn == nil {
b.IsPreferConn = func(info balancerConfig.Info, c conn.Conn) bool { return true }
if b.Filter == nil {
b.Filter = filterFunc(func(info balancerConfig.Info, c conn.Conn) bool { return true })
}
res := make([]conn.Conn, 0, len(conns))
for _, c := range conns {
if b.IsPreferConn(info, c) {
if b.Filter.Allow(info, c) {
res = append(res, c)
}
}
Expand Down
8 changes: 4 additions & 4 deletions balancers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ const (
type preferType string

const (
preferLocalDC = preferType("local_dc")
preferLocations = preferType("locations")
preferTypeLocalDC = preferType("local_dc")
preferTypeLocations = preferType("locations")
)

type balancersConfig struct {
Expand Down Expand Up @@ -88,12 +88,12 @@ func CreateFromConfig(s string) (*balancerConfig.Config, error) {
}

switch c.Prefer {
case preferLocalDC:
case preferTypeLocalDC:
if c.Fallback {
return PreferLocalDCWithFallBack(b), nil
}
return PreferLocalDC(b), nil
case preferLocations:
case preferTypeLocations:
if len(c.Locations) == 0 {
return nil, xerrors.WithStackTrace(fmt.Errorf("empty locations list in balancer '%s' config", c.Type))
}
Expand Down
32 changes: 16 additions & 16 deletions balancers/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func TestFromConfig(t *testing.T) {
"prefer": "local_dc"
}`,
res: balancerConfig.Config{
DetectlocalDC: true,
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
DetectLocalDC: true,
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
// some non nil func
return false
},
}),
},
},
{
Expand All @@ -93,12 +93,12 @@ func TestFromConfig(t *testing.T) {
"fallback": true
}`,
res: balancerConfig.Config{
AllowFalback: true,
DetectlocalDC: true,
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
AllowFallback: true,
DetectLocalDC: true,
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
// some non nil func
return false
},
}),
},
},
{
Expand All @@ -109,10 +109,10 @@ func TestFromConfig(t *testing.T) {
"locations": ["AAA", "BBB", "CCC"]
}`,
res: balancerConfig.Config{
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
// some non nil func
return false
},
}),
},
},
{
Expand All @@ -124,11 +124,11 @@ func TestFromConfig(t *testing.T) {
"fallback": true
}`,
res: balancerConfig.Config{
AllowFalback: true,
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
AllowFallback: true,
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
// some non nil func
return false
},
}),
},
},
} {
Expand All @@ -155,10 +155,10 @@ func TestFromConfig(t *testing.T) {
}

// function pointers can check equal to nil only
if tt.res.IsPreferConn != nil {
require.NotNil(t, b.IsPreferConn)
b.IsPreferConn = nil
tt.res.IsPreferConn = nil
if tt.res.Filter != nil {
require.NotNil(t, b.Filter)
b.Filter = nil
tt.res.Filter = nil
}

require.Equal(t, &tt.res, b)
Expand Down
8 changes: 4 additions & 4 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
return xerrors.WithStackTrace(err)
}

if b.balancerConfig.DetectlocalDC {
if b.balancerConfig.DetectLocalDC {
localDC, err = b.localDCDetector(ctx, endpoints)
if err != nil {
return xerrors.WithStackTrace(err)
Expand All @@ -128,7 +128,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {

func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
onDone := trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.balancerConfig.DetectlocalDC,
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.balancerConfig.DetectLocalDC,
)
defer func() {
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
Expand All @@ -145,7 +145,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
}

info := balancerConfig.Info{SelfLocation: localDC}
state := newConnectionsState(connections, b.balancerConfig.IsPreferConn, info, b.balancerConfig.AllowFalback)
state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback)

endpointsInfo := make([]endpoint.Info, len(endpoints))
for i, e := range endpoints {
Expand Down Expand Up @@ -187,7 +187,7 @@ func New(
) (b *Balancer, finalErr error) {
var (
onDone = trace.DriverOnBalancerInit(
driverConfig.Trace(), &ctx, stack.FunctionID(0),
driverConfig.Trace(), &ctx, stack.FunctionID(0), driverConfig.Balancer().String(),
)
discoveryConfig = discoveryConfig.New(append(opts,
discoveryConfig.With(driverConfig.Common),
Expand Down
44 changes: 39 additions & 5 deletions internal/balancer/config/routerconfig.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,52 @@
package config

import "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
import (
"fmt"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
)

// Dedicated package need for prevent cyclo dependencies config -> balancer -> config

type Config struct {
IsPreferConn PreferConnFunc
AllowFalback bool
Filter Filter
AllowFallback bool
SingleConn bool
DetectlocalDC bool
DetectLocalDC bool
}

func (c Config) String() string {
if c.SingleConn {
return "SingleConn"
}

buffer := xstring.Buffer()
defer buffer.Free()

buffer.WriteString("RandomChoice{")

buffer.WriteString("DetectLocalDC=")
fmt.Fprintf(buffer, "%t", c.DetectLocalDC)

buffer.WriteString(",AllowFallback=")
fmt.Fprintf(buffer, "%t", c.AllowFallback)

if c.Filter != nil {
buffer.WriteString(",Filter=")
fmt.Fprint(buffer, c.Filter.String())
}

buffer.WriteByte('}')

return buffer.String()
}

type Info struct {
SelfLocation string
}

type PreferConnFunc func(info Info, c conn.Conn) bool
type Filter interface {
Allow(info Info, c conn.Conn) bool
String() string
}
Loading

0 comments on commit 6373d76

Please sign in to comment.