Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring #1174

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 26 additions & 97 deletions balancers/balancers.go
Original file line number Diff line number Diff line change
@@ -1,158 +1,87 @@
package balancers

import (
"sort"
"strings"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
)

// Deprecated: RoundRobin is an alias to RandomChoice now
// Will be removed after Oct 2024.
// Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated
func RoundRobin() *balancerConfig.Config {
return &balancerConfig.Config{}
return balancerConfig.New()
}

func RandomChoice() *balancerConfig.Config {
return &balancerConfig.Config{}
return balancerConfig.New()
}

func SingleConn() *balancerConfig.Config {
return &balancerConfig.Config{
SingleConn: true,
}
}

type filterLocalDC struct{}

func (filterLocalDC) Allow(info balancerConfig.Info, c conn.Conn) bool {
return c.Endpoint().Location() == info.SelfLocation
}

func (filterLocalDC) String() string {
return "LocalDC"
return balancerConfig.New(balancerConfig.UseSingleConn())
}

// PreferLocalDC creates balancer which use endpoints only in location such as initial endpoint location
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// PreferLocalDC balancer try to autodetect local DC from client side.
func PreferLocalDC(balancer *balancerConfig.Config) *balancerConfig.Config {
balancer.Filter = filterLocalDC{}
balancer.DetectLocalDC = true

return balancer
return balancer.With(
balancerConfig.FilterLocalDC(),
balancerConfig.DetectLocalDC(),
)
}

// PreferLocalDCWithFallBack creates balancer which use endpoints only in location such as initial endpoint location
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.Config {
balancer = PreferLocalDC(balancer)
balancer.AllowFallback = true

return balancer
}

type filterLocations []string

func (locations filterLocations) Allow(_ balancerConfig.Info, c conn.Conn) bool {
location := strings.ToUpper(c.Endpoint().Location())
for _, l := range locations {
if location == l {
return true
}
}

return false
}

func (locations filterLocations) String() string {
buffer := xstring.Buffer()
defer buffer.Free()

buffer.WriteString("Locations{")
for i, l := range locations {
if i != 0 {
buffer.WriteByte(',')
}
buffer.WriteString(l)
}
buffer.WriteByte('}')

return buffer.String()
return PreferLocalDC(balancer).With(balancerConfig.AllowFallback())
}

// PreferLocations creates balancer which use endpoints only in selected locations (such as "ABC", "DEF", etc.)
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
func PreferLocations(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
if len(locations) == 0 {
panic("empty list of locations")
}
for i := range locations {
locations[i] = strings.ToUpper(locations[i])
}
sort.Strings(locations)
balancer.Filter = filterLocations(locations)

return balancer
return balancer.With(balancerConfig.FilterLocations(locations...))
}

// PreferLocationsWithFallback creates balancer which use endpoints only in selected locations
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferLocationsWithFallback(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
balancer = PreferLocations(balancer, locations...)
balancer.AllowFallback = true

return balancer
return balancer.With(
balancerConfig.FilterLocations(locations...),
balancerConfig.AllowFallback(),
)
}

type Endpoint interface {
NodeID() uint32
Address() string
Location() string

// Deprecated: LocalDC check "local" by compare endpoint location with discovery "selflocation" field.
// It work good only if connection url always point to local dc.
// Will be removed after Oct 2024.
// Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated
LocalDC() bool
}

type filterFunc func(info balancerConfig.Info, c conn.Conn) bool

func (p filterFunc) Allow(info balancerConfig.Info, c conn.Conn) bool {
return p(info, c)
}

func (p filterFunc) String() string {
return "Custom"
}

// Prefer creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer.Filter = filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
return filter(c.Endpoint())
})

return balancer
return balancer.With(
balancerConfig.FilterFunc(func(_ balancerConfig.Info, c conn.Info) bool {
return filter(c)
}),
)
}

// PreferWithFallback creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferWithFallback(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer = Prefer(balancer, filter)
balancer.AllowFallback = true

return balancer
func PreferWithFallback(balancer *balancerConfig.Config, filter func(endpoint conn.Info) bool) *balancerConfig.Config {
return balancer.With(
balancerConfig.FilterFunc(func(_ balancerConfig.Info, c conn.Info) bool {
return filter(c)
}),
balancerConfig.AllowFallback(),
)
}

// Default balancer used by default
func Default() *balancerConfig.Config {
return RandomChoice()
return balancerConfig.New()
}
58 changes: 28 additions & 30 deletions balancers/balancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,63 @@ import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/connectivity"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
)

func TestPreferLocalDC(t *testing.T) {
conns := []conn.Conn{
&mock.Conn{AddrField: "1", LocationField: "1"},
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "2"},
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
conns := []conn.Info{
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "1"}},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "2"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "2"}, StateField: connectivity.Ready},
}
rr := PreferLocalDC(RandomChoice())
require.False(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
require.False(t, rr.AllowFallback())
require.Equal(t, []conn.Info{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
}

func TestPreferLocalDCWithFallBack(t *testing.T) {
conns := []conn.Conn{
&mock.Conn{AddrField: "1", LocationField: "1"},
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "2"},
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
conns := []conn.Info{
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "1"}},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "2"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "2"}, StateField: connectivity.Ready},
}
rr := PreferLocalDCWithFallBack(RandomChoice())
require.True(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
require.True(t, rr.AllowFallback())
require.Equal(t, []conn.Info{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
}

func TestPreferLocations(t *testing.T) {
conns := []conn.Conn{
&mock.Conn{AddrField: "1", LocationField: "zero", State: conn.Online},
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "one"},
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "two"},
conns := []conn.Info{
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "zero"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "one"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "two"}, StateField: connectivity.Ready},
}

rr := PreferLocations(RandomChoice(), "zero", "two")
require.False(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
require.False(t, rr.AllowFallback())
require.Equal(t, []conn.Info{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
}

func TestPreferLocationsWithFallback(t *testing.T) {
conns := []conn.Conn{
&mock.Conn{AddrField: "1", LocationField: "zero", State: conn.Online},
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "one"},
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "two"},
conns := []conn.Info{
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "zero"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "one"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "two"}, StateField: connectivity.Ready},
}

rr := PreferLocationsWithFallback(RandomChoice(), "zero", "two")
require.True(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
require.True(t, rr.AllowFallback())
require.Equal(t, []conn.Info{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
}

func applyPreferFilter(info balancerConfig.Info, b *balancerConfig.Config, conns []conn.Conn) []conn.Conn {
if b.Filter == nil {
b.Filter = filterFunc(func(info balancerConfig.Info, c conn.Conn) bool { return true })
}
res := make([]conn.Conn, 0, len(conns))
func applyPreferFilter(info balancerConfig.Info, b *balancerConfig.Config, conns []conn.Info) []conn.Info {
res := make([]conn.Info, 0, len(conns))
for _, c := range conns {
if b.Filter.Allow(info, c) {
if b.Filter(info, c) {
res = append(res, c)
}
}
Expand Down
Loading
Loading