Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fixes to digital ocean droplet creation, logging, timeout finetuning #117

Merged
2 changes: 1 addition & 1 deletion core/monitoring/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func SetupPrometheusTask(ctx context.Context, logger *zap.Logger, p provider.Pro
"--web.console.libraries=/usr/share/prometheus/console_libraries",
"--web.console.templates=/usr/share/prometheus/consoles",
},
ContainerName: "prometheus",
ContainerName: "prometheus",
ProviderSpecificConfig: opts.ProviderSpecificConfig,
})
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion core/provider/digitalocean/digitalocean_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa
doClient := godo.NewFromToken(token)

sshPubKey, sshPrivKey, sshFingerprint, err := makeSSHKeyPair()

if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion core/provider/digitalocean/droplet.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe

start := time.Now()

err = util.WaitForCondition(ctx, time.Second*600, time.Second*2, func() (bool, error) {
err = util.WaitForCondition(ctx, time.Second*100, time.Millisecond*100, func() (bool, error) {
d, _, err := p.doClient.Droplets.Get(ctx, droplet.ID)

if err != nil {
Expand All @@ -74,6 +74,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost(fmt.Sprintf("tcp://%s:2375", ip)))

if err != nil {
p.logger.Error("failed to create docker client", zap.Error(err))
return false, err
}

Expand All @@ -83,6 +84,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe
return false, nil
}

p.logger.Info("droplet is active", zap.Duration("after", time.Since(start)), zap.String("task", definition.Name))
return true, nil
})

Expand Down
55 changes: 44 additions & 11 deletions core/provider/digitalocean/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"bytes"
"context"
"fmt"
"io"
"net"
"path"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
Expand All @@ -15,10 +20,6 @@ import (
"github.com/spf13/afero"
"github.com/spf13/afero/sftpfs"
"go.uber.org/zap"
"io"
"net"
"path"
"time"
)

func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definition provider.TaskDefinition) (string, error) {
Expand All @@ -27,7 +28,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio
}

if definition.ProviderSpecificConfig == nil {
return "", fmt.Errorf("digitalocean specific config is nil")
return "", fmt.Errorf("digitalocean specific config is nil for %s", definition.Name)
}

_, ok := definition.ProviderSpecificConfig.(DigitalOceanTaskConfig)
Expand Down Expand Up @@ -122,7 +123,7 @@ func (p *Provider) StartTask(ctx context.Context, taskName string) error {
return err
}

err = util.WaitForCondition(ctx, time.Second*300, time.Second*2, func() (bool, error) {
err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*100, func() (bool, error) {
status, err := p.GetTaskStatus(ctx, taskName)
if err != nil {
return false, err
Expand Down Expand Up @@ -366,6 +367,7 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string,
dockerClient, err := p.getDropletDockerClient(ctx, taskName)

if err != nil {
p.logger.Error("failed to get docker client", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

Expand Down Expand Up @@ -396,45 +398,76 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string,
}, nil, nil, definition.ContainerName)

if err != nil {
p.logger.Error("failed to create container", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

defer dockerClient.ContainerRemove(ctx, createdContainer.ID, types.ContainerRemoveOptions{Force: true})

err = dockerClient.ContainerStart(ctx, createdContainer.ID, types.ContainerStartOptions{})

if err != nil {
if err := startContainerWithBlock(ctx, dockerClient, createdContainer.ID); err != nil {
p.logger.Error("failed to start container", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

// wait for container start
exec, err := dockerClient.ContainerExecCreate(ctx, createdContainer.ID, types.ExecConfig{
AttachStdout: true,
AttachStderr: true,
Cmd: command,
})

if err != nil {
p.logger.Error("failed to create exec", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

resp, err := dockerClient.ContainerExecAttach(ctx, exec.ID, types.ExecStartCheck{})

if err != nil {
p.logger.Error("failed to attach to exec", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

defer resp.Close()

execInspect, err := dockerClient.ContainerExecInspect(ctx, exec.ID)
if err != nil {
p.logger.Error("failed to inspect exec", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

var stdout, stderr bytes.Buffer

_, err = stdcopy.StdCopy(&stdout, &stderr, resp.Reader)

return stdout.String(), stderr.String(), execInspect.ExitCode, nil
return stdout.String(), stderr.String(), execInspect.ExitCode, err
}

func startContainerWithBlock(ctx context.Context, dockerClient *dockerclient.Client, containerID string) error {
// start container
if err := dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
return err
}

// cancel container after a minute
waitCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-waitCtx.Done():
return fmt.Errorf("error waiting for container to start: %v", waitCtx.Err())
case <-ticker.C:
container, err := dockerClient.ContainerInspect(ctx, containerID)
if err != nil {
return err
}

// if the container is running, we're done
if container.State.Running {
return nil
}
}
}
}

func (p *Provider) pullImage(ctx context.Context, dockerClient *dockerclient.Client, image string) error {
Expand Down
4 changes: 2 additions & 2 deletions core/provider/digitalocean/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package digitalocean

type DigitalOceanTaskConfig struct {
Region string
Size string
Region string
Size string
ImageID int
}
4 changes: 2 additions & 2 deletions core/provider/docker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio
listeners.CloseAll()
return "", err
}

// network map is volatile, so we need to mutex update it
p.networkMu.Lock()
p.listeners[createdContainer.ID] = listeners
Expand Down Expand Up @@ -144,7 +144,7 @@ func (p *Provider) StartTask(ctx context.Context, id string) error {
if status == provider.TASK_RUNNING {
return nil
}
time.Sleep(time.Second)
time.Sleep(time.Millisecond * 100)
}
}

Expand Down
3 changes: 3 additions & 0 deletions core/provider/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,21 @@ func (t *Task) GetExternalAddress(ctx context.Context, port string) (string, err
func (t *Task) RunCommand(ctx context.Context, command []string) (string, string, int, error) {
status, err := t.Provider.GetTaskStatus(ctx, t.ID)
if err != nil {
t.logger.Error("failed to get task status", zap.Error(err), zap.Any("definition", t.Definition))
return "", "", 0, err
}

if status == TASK_RUNNING {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "running"))
return t.Provider.RunCommand(ctx, t.ID, command)
}

t.mu.Lock()
defer t.mu.Unlock()

t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "not running"))
return t.Provider.RunCommandWhileStopped(ctx, t.ID, t.Definition, command)
}

Expand Down
2 changes: 1 addition & 1 deletion core/types/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type ChainConfig struct {

NodeCreator NodeCreator // NodeCreator is a function that creates a node
NodeDefinitionModifier NodeDefinitionModifier // NodeDefinitionModifier is a function that modifies a node's definition
// number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set.
// number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set.
// if not set.
GenesisDelegation *big.Int
// number of tokens to allocate to the genesis account. This value defaults to 5_000_000 if not set.
Expand Down
16 changes: 12 additions & 4 deletions cosmos/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func CreateChain(ctx context.Context, logger *zap.Logger, infraProvider provider
Provider: infraProvider,
Chain: &chain,
})

if err != nil {
return err
}
Expand Down Expand Up @@ -178,8 +177,9 @@ func (c *Chain) Init(ctx context.Context) error {
v := v
idx := idx
eg.Go(func() error {
c.logger.Info("setting up validator home dir", zap.String("validator", v.GetTask().Definition.Name))
if err := v.InitHome(ctx); err != nil {
return err
return fmt.Errorf("error initializing home dir: %v", err)
}

validatorWallet, err := v.CreateWallet(ctx, petritypes.ValidatorKeyName, c.Config.WalletConfig)
Expand Down Expand Up @@ -212,6 +212,7 @@ func (c *Chain) Init(ctx context.Context) error {
n := n

eg.Go(func() error {
c.logger.Info("setting up node home dir", zap.String("node", n.GetTask().Definition.Name))
if err := n.InitHome(ctx); err != nil {
return err
}
Expand All @@ -224,6 +225,7 @@ func (c *Chain) Init(ctx context.Context) error {
return err
}

c.logger.Info("adding faucet genesis")
faucetWallet, err := c.BuildWallet(ctx, petritypes.FaucetAccountKeyName, "", c.Config.WalletConfig)

if err != nil {
Expand All @@ -246,6 +248,7 @@ func (c *Chain) Init(ctx context.Context) error {
return err
}

c.logger.Info("setting up validator keys", zap.String("validator", validatorN.GetTask().Definition.Name), zap.String("address", bech32))
if err := firstValidator.AddGenesisAccount(ctx, bech32, genesisAmounts); err != nil {
return err
}
Expand Down Expand Up @@ -287,6 +290,7 @@ func (c *Chain) Init(ctx context.Context) error {
}

for _, v := range c.Validators {
c.logger.Info("overwriting genesis for validator", zap.String("validator", v.GetTask().Definition.Name))
if err := v.OverwriteGenesisFile(ctx, genbz); err != nil {
return err
}
Expand All @@ -299,12 +303,14 @@ func (c *Chain) Init(ctx context.Context) error {
}

for _, v := range c.Validators {
c.logger.Info("starting validator task", zap.String("validator", v.GetTask().Definition.Name))
if err := v.GetTask().Start(ctx, true); err != nil {
return err
}
}

for _, n := range c.Nodes {
c.logger.Info("overwriting node genesis", zap.String("node", n.GetTask().Definition.Name))
if err := n.OverwriteGenesisFile(ctx, genbz); err != nil {
return err
}
Expand All @@ -317,6 +323,7 @@ func (c *Chain) Init(ctx context.Context) error {
}

for _, n := range c.Nodes {
c.logger.Info("starting node task", zap.String("node", n.GetTask().Definition.Name))
if err := n.GetTask().Start(ctx, true); err != nil {
return err
}
Expand Down Expand Up @@ -429,17 +436,18 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error {
c.logger.Info("waiting for height", zap.Uint64("desired_height", desiredHeight))
for {
c.logger.Debug("waiting for height", zap.Uint64("desired_height", desiredHeight))

height, err := c.Height(ctx)
if err != nil {
c.logger.Error("failed to get height", zap.Error(err))
time.Sleep(2 * time.Second)
continue
}

if height >= desiredHeight {
break
}

// We assume the chain will eventually return a non-zero height, otherwise
// this may block indefinitely.
if height == 0 {
Expand Down
2 changes: 1 addition & 1 deletion cosmos/node/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (n *Node) AddGenesisAccount(ctx context.Context, address string, genesisAmo
amount += fmt.Sprintf("%s%s", coin.Amount.String(), coin.Denom)
}

ctx, cancel := context.WithTimeout(ctx, time.Minute)
ctx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()

var command []string
Expand Down
Loading