Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fixes to digital ocean droplet creation, logging, timeout finetuning #117

Merged
2 changes: 1 addition & 1 deletion core/monitoring/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func SetupPrometheusTask(ctx context.Context, logger *zap.Logger, p provider.Pro
"--web.console.libraries=/usr/share/prometheus/console_libraries",
"--web.console.templates=/usr/share/prometheus/consoles",
},
ContainerName: "prometheus",
ContainerName: "prometheus",
ProviderSpecificConfig: opts.ProviderSpecificConfig,
})
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions core/provider/digitalocean/digitalocean_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa
doClient := godo.NewFromToken(token)

sshPubKey, sshPrivKey, sshFingerprint, err := makeSSHKeyPair()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -77,11 +76,10 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa
firewall, err := digitalOceanProvider.createFirewall(ctx, userIPs)

if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create firewall: %w", err)
}

digitalOceanProvider.firewallID = firewall.ID

_, err = digitalOceanProvider.createSSHKey(ctx, sshPubKey)

if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions core/provider/digitalocean/droplet.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe

start := time.Now()

err = util.WaitForCondition(ctx, time.Second*600, time.Second*2, func() (bool, error) {
err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*300, func() (bool, error) {
d, _, err := p.doClient.Droplets.Get(ctx, droplet.ID)

if err != nil {
Expand All @@ -74,6 +74,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost(fmt.Sprintf("tcp://%s:2375", ip)))

if err != nil {
p.logger.Error("failed to create docker client", zap.Error(err))
return false, err
}

Expand All @@ -83,6 +84,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe
return false, nil
}

p.logger.Info("droplet is active", zap.Duration("after", time.Since(start)), zap.String("task", definition.Name))
return true, nil
})

Expand Down Expand Up @@ -117,13 +119,16 @@ func (p *Provider) deleteDroplet(ctx context.Context, name string) error {
return nil
}

func (p *Provider) getDroplet(ctx context.Context, name string) (*godo.Droplet, error) {
func (p *Provider) getDroplet(ctx context.Context, name string, returnOnCacheHit bool) (*godo.Droplet, error) {
cachedDroplet, ok := p.droplets.Load(name)

if !ok {
return nil, fmt.Errorf("could not find droplet %s", name)
}

if ok && returnOnCacheHit {
return cachedDroplet, nil
}

droplet, res, err := p.doClient.Droplets.Get(ctx, cachedDroplet.ID)

if err != nil {
Expand Down
59 changes: 46 additions & 13 deletions core/provider/digitalocean/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"bytes"
"context"
"fmt"
"io"
"net"
"path"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
Expand All @@ -15,10 +20,6 @@ import (
"github.com/spf13/afero"
"github.com/spf13/afero/sftpfs"
"go.uber.org/zap"
"io"
"net"
"path"
"time"
)

func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definition provider.TaskDefinition) (string, error) {
Expand All @@ -27,7 +28,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio
}

if definition.ProviderSpecificConfig == nil {
return "", fmt.Errorf("digitalocean specific config is nil")
return "", fmt.Errorf("digitalocean specific config is nil for %s", definition.Name)
}

_, ok := definition.ProviderSpecificConfig.(DigitalOceanTaskConfig)
Expand Down Expand Up @@ -122,7 +123,7 @@ func (p *Provider) StartTask(ctx context.Context, taskName string) error {
return err
}

err = util.WaitForCondition(ctx, time.Second*300, time.Second*2, func() (bool, error) {
err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*100, func() (bool, error) {
status, err := p.GetTaskStatus(ctx, taskName)
if err != nil {
return false, err
Expand Down Expand Up @@ -171,7 +172,7 @@ func (p *Provider) DestroyTask(ctx context.Context, taskName string) error {
}

func (p *Provider) GetTaskStatus(ctx context.Context, taskName string) (provider.TaskStatus, error) {
droplet, err := p.getDroplet(ctx, taskName)
droplet, err := p.getDroplet(ctx, taskName, false)

if err != nil {
return provider.TASK_STATUS_UNDEFINED, err
Expand Down Expand Up @@ -295,7 +296,7 @@ func (p *Provider) DownloadDir(ctx context.Context, s string, s2 string, s3 stri
}

func (p *Provider) GetIP(ctx context.Context, taskName string) (string, error) {
droplet, err := p.getDroplet(ctx, taskName)
droplet, err := p.getDroplet(ctx, taskName, true)

if err != nil {
return "", err
Expand Down Expand Up @@ -366,6 +367,7 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string,
dockerClient, err := p.getDropletDockerClient(ctx, taskName)

if err != nil {
p.logger.Error("failed to get docker client", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

Expand Down Expand Up @@ -396,45 +398,76 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string,
}, nil, nil, definition.ContainerName)

if err != nil {
p.logger.Error("failed to create container", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

defer dockerClient.ContainerRemove(ctx, createdContainer.ID, types.ContainerRemoveOptions{Force: true})

err = dockerClient.ContainerStart(ctx, createdContainer.ID, types.ContainerStartOptions{})

if err != nil {
if err := startContainerWithBlock(ctx, dockerClient, createdContainer.ID); err != nil {
p.logger.Error("failed to start container", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

// wait for container start
exec, err := dockerClient.ContainerExecCreate(ctx, createdContainer.ID, types.ExecConfig{
AttachStdout: true,
AttachStderr: true,
Cmd: command,
})

if err != nil {
p.logger.Error("failed to create exec", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

resp, err := dockerClient.ContainerExecAttach(ctx, exec.ID, types.ExecStartCheck{})

if err != nil {
p.logger.Error("failed to attach to exec", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

defer resp.Close()

execInspect, err := dockerClient.ContainerExecInspect(ctx, exec.ID)
if err != nil {
p.logger.Error("failed to inspect exec", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

var stdout, stderr bytes.Buffer

_, err = stdcopy.StdCopy(&stdout, &stderr, resp.Reader)

return stdout.String(), stderr.String(), execInspect.ExitCode, nil
return stdout.String(), stderr.String(), execInspect.ExitCode, err
}

func startContainerWithBlock(ctx context.Context, dockerClient *dockerclient.Client, containerID string) error {
// start container
if err := dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
return err
}

// cancel container after a minute
waitCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-waitCtx.Done():
return fmt.Errorf("error waiting for container to start: %v", waitCtx.Err())
case <-ticker.C:
container, err := dockerClient.ContainerInspect(ctx, containerID)
if err != nil {
return err
}

// if the container is running, we're done
if container.State.Running {
return nil
}
}
}
}

func (p *Provider) pullImage(ctx context.Context, dockerClient *dockerclient.Client, image string) error {
Expand Down
4 changes: 2 additions & 2 deletions core/provider/digitalocean/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package digitalocean

type DigitalOceanTaskConfig struct {
Region string
Size string
Region string
Size string
ImageID int
}
4 changes: 2 additions & 2 deletions core/provider/docker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio
listeners.CloseAll()
return "", err
}

// network map is volatile, so we need to mutex update it
p.networkMu.Lock()
p.listeners[createdContainer.ID] = listeners
Expand Down Expand Up @@ -144,7 +144,7 @@ func (p *Provider) StartTask(ctx context.Context, id string) error {
if status == provider.TASK_RUNNING {
return nil
}
time.Sleep(time.Second)
time.Sleep(time.Millisecond * 100)
}
}

Expand Down
10 changes: 6 additions & 4 deletions core/provider/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ func CreateTask(ctx context.Context, logger *zap.Logger, provider Provider, defi
return task, nil
}

// Start starts the underlying task's workload including its sidecars if startSidecars is set to true
// Start starts the underlying task's workload including its sidecars if startSidecars is set to true.
// This method does not take a lock on the provider, hence 2 threads may simultaneously call Start on the same task,
// this is not thread-safe: PLEASE DON'T DO THAT.
func (t *Task) Start(ctx context.Context, startSidecars bool) error {
t.mu.Lock()
defer t.mu.Unlock()

if startSidecars {
for _, sidecar := range t.Sidecars {
err := sidecar.Start(ctx, startSidecars)
Expand Down Expand Up @@ -161,18 +160,21 @@ func (t *Task) GetExternalAddress(ctx context.Context, port string) (string, err
func (t *Task) RunCommand(ctx context.Context, command []string) (string, string, int, error) {
status, err := t.Provider.GetTaskStatus(ctx, t.ID)
if err != nil {
t.logger.Error("failed to get task status", zap.Error(err), zap.Any("definition", t.Definition))
return "", "", 0, err
}

if status == TASK_RUNNING {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "running"))
return t.Provider.RunCommand(ctx, t.ID, command)
}

t.mu.Lock()
defer t.mu.Unlock()

t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "not running"))
return t.Provider.RunCommandWhileStopped(ctx, t.ID, t.Definition, command)
}

Expand Down
2 changes: 1 addition & 1 deletion core/types/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type ChainConfig struct {

NodeCreator NodeCreator // NodeCreator is a function that creates a node
NodeDefinitionModifier NodeDefinitionModifier // NodeDefinitionModifier is a function that modifies a node's definition
// number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set.
// number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set.
// if not set.
GenesisDelegation *big.Int
// number of tokens to allocate to the genesis account. This value defaults to 5_000_000 if not set.
Expand Down
Loading
Loading