Skip to content

Commit

Permalink
[Disk Manager] clients/nfs/endpointPicker should wait until node beco…
Browse files Browse the repository at this point in the history
…mes ready (#1852)
  • Loading branch information
SvartMetal authored Aug 27, 2024
1 parent 2e9ae2a commit 5e6dfa5
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func writeBlocksToSession(

////////////////////////////////////////////////////////////////////////////////

func TestCreate(t *testing.T) {
func TestCreateDisk(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

Expand Down
57 changes: 36 additions & 21 deletions cloud/disk_manager/internal/pkg/clients/nfs/endpoint_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/common"
"github.com/ydb-platform/nbs/cloud/filestore/public/api/protos"
nfs_client "github.com/ydb-platform/nbs/cloud/filestore/public/sdk/go/client"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -23,21 +24,19 @@ var endpointPickerCheckPeriod = 5 * time.Second
type endpointPicker struct {
endpoints []string
healthyEndpoints []string
mutex sync.RWMutex
mutex sync.Mutex
markedAsHealthy common.Cond
}

func newEndpointPicker(
ctx context.Context,
endpoints []string,
) *endpointPicker {

healthyEndpoints := make([]string, len(endpoints))
copy(healthyEndpoints, endpoints)

p := &endpointPicker{
endpoints: endpoints,
healthyEndpoints: healthyEndpoints,
endpoints: endpoints,
}
p.markedAsHealthy = common.NewCond(&p.mutex)

go func() {
ticker := time.NewTicker(endpointPickerCheckPeriod)
Expand All @@ -47,7 +46,7 @@ func newEndpointPicker(
select {
case <-ticker.C:
for _, endpoint := range endpoints {
p.checkEndpoint(ctx, endpoint)
p.checkHealth(ctx, endpoint)
}
case <-ctx.Done():
return
Expand All @@ -57,8 +56,8 @@ func newEndpointPicker(
return p
}

func (p *endpointPicker) checkEndpoint(ctx context.Context, endpoint string) {
client, err := nfs_client.NewGrpcEndpointClient(
func (p *endpointPicker) checkHealth(ctx context.Context, endpoint string) {
client, err := nfs_client.NewGrpcClient(
&nfs_client.GrpcClientOpts{
Endpoint: endpoint,
// Credentials: not needed here
Expand All @@ -67,44 +66,60 @@ func (p *endpointPicker) checkEndpoint(ctx context.Context, endpoint string) {
NewNfsClientLog(nfs_client.LOG_DEBUG),
)
if err != nil {
p.markAsUnhealthy(endpoint)
p.markAsUnhealthy(ctx, endpoint)
return
}
defer client.Close()

logging.Debug(ctx, "pinging filestore endpoint %q", endpoint)

_, err = client.Ping(ctx, &protos.TPingRequest{})
if err != nil {
p.markAsUnhealthy(endpoint)
p.markAsUnhealthy(ctx, endpoint)
return
}

p.markAsHealthy(endpoint)
p.markAsHealthy(ctx, endpoint)
}

func (p *endpointPicker) markAsUnhealthy(endpoint string) {
func (p *endpointPicker) markAsUnhealthy(
ctx context.Context,
endpoint string,
) {

p.mutex.Lock()
defer p.mutex.Unlock()

p.healthyEndpoints = common.Remove(p.healthyEndpoints, endpoint)
logging.Info(ctx, "filestore endpoint %q marked as healthy", endpoint)
}

func (p *endpointPicker) markAsHealthy(endpoint string) {
func (p *endpointPicker) markAsHealthy(ctx context.Context, endpoint string) {
p.mutex.Lock()
defer p.mutex.Unlock()

if !common.Find(p.healthyEndpoints, endpoint) {
p.healthyEndpoints = append(p.healthyEndpoints, endpoint)
logging.Info(ctx, "filestore endpoint %q marked as unhealthy", endpoint)

p.markedAsHealthy.Broadcast()
}
}

func (p *endpointPicker) pickEndpoint() string {
p.mutex.RLock()
defer p.mutex.RUnlock()
func (p *endpointPicker) pickEndpoint(ctx context.Context) (string, error) {
p.mutex.Lock()
defer p.mutex.Unlock()

endpoints := p.healthyEndpoints
if len(endpoints) == 0 {
endpoints = p.endpoints
for len(p.healthyEndpoints) == 0 {
logging.Info(
ctx,
"waiting for one of filestore endpoints to become ready",
)
err := p.markedAsHealthy.Wait(ctx)
if err != nil {
return "", err
}
}

return common.RandomElement(endpoints)
return common.RandomElement(p.healthyEndpoints), nil
}
7 changes: 5 additions & 2 deletions cloud/disk_manager/internal/pkg/clients/nfs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,14 @@ func (f *factory) NewClient(
return nil, err
}

endpointPicker := f.endpointPickers[zoneID]
endpoint, err := f.endpointPickers[zoneID].pickEndpoint(ctx)
if err != nil {
return nil, err
}

nfs, err := nfs_client.NewClient(
&nfs_client.GrpcClientOpts{
Endpoint: endpointPicker.pickEndpoint(),
Endpoint: endpoint,
Credentials: clientCreds,
},
&nfs_client.DurableClientOpts{
Expand Down
17 changes: 11 additions & 6 deletions cloud/disk_manager/internal/pkg/clients/nfs/tests/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func getEndpoint() string {

func newFactory(t *testing.T, ctx context.Context) nfs.Factory {
insecure := true
factory := nfs.NewFactory(
return nfs.NewFactory(
ctx,
&config.ClientConfig{
Zones: map[string]*config.Zone{
Expand All @@ -46,7 +46,6 @@ func newFactory(t *testing.T, ctx context.Context) nfs.Factory {
},
metrics.NewEmptyRegistry(),
)
return factory
}

func newClient(t *testing.T, ctx context.Context) nfs.Client {
Expand All @@ -58,22 +57,26 @@ func newClient(t *testing.T, ctx context.Context) nfs.Client {

////////////////////////////////////////////////////////////////////////////////

func TestCreate(t *testing.T) {
func TestCreateFilesystem(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

filesystemID := t.Name()

err := client.Create(ctx, filesystemID, nfs.CreateFilesystemParams{
BlocksCount: 10,
FolderID: "folder",
CloudID: "cloud",
BlocksCount: 1024,
BlockSize: 4096,
Kind: types.FilesystemKind_FILESYSTEM_KIND_SSD,
})
require.NoError(t, err)

// Creating the same filesystem twice is not an error
err = client.Create(ctx, filesystemID, nfs.CreateFilesystemParams{
BlocksCount: 10,
FolderID: "folder",
CloudID: "cloud",
BlocksCount: 1024,
BlockSize: 4096,
Kind: types.FilesystemKind_FILESYSTEM_KIND_SSD,
})
Expand All @@ -87,7 +90,9 @@ func TestDeleteFilesystem(t *testing.T) {
filesystemID := t.Name()

err := client.Create(ctx, filesystemID, nfs.CreateFilesystemParams{
BlocksCount: 10,
FolderID: "folder",
CloudID: "cloud",
BlocksCount: 1024,
BlockSize: 4096,
Kind: types.FilesystemKind_FILESYSTEM_KIND_SSD,
})
Expand Down
4 changes: 4 additions & 0 deletions cloud/disk_manager/internal/pkg/common/cond.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (c *Cond) Signal() {
c.cond.Signal()
}

func (c *Cond) Broadcast() {
c.cond.Broadcast()
}

// Waits for internal condvar event or for ctx cancellation.
func (c *Cond) Wait(ctx context.Context) error {
waitFinishedCtx, waitFinished := context.WithCancel(context.Background())
Expand Down
3 changes: 2 additions & 1 deletion cloud/disk_manager/test/recipe/nfs_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def __init__(self, ydb_port, domains_txt, names_txt, nfs_binary_path):
# FIXME: use kikimr service, resolve tenant config issues
service_type="local",
verbose=True,
kikimr_port=ydb_port)
kikimr_port=ydb_port,
)
self.__nfs_configurator.generate_configs(domains_txt, names_txt)

self.__nfs_server = NfsServer(configurator=self.__nfs_configurator)
Expand Down

0 comments on commit 5e6dfa5

Please sign in to comment.