From 59b09e54a9cd3ec9f545a0dde417096aecbaf548 Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Tue, 12 Mar 2024 09:46:14 -0700 Subject: [PATCH 01/12] fixes --- core/monitoring/prometheus.go | 1 + .../digitalocean/digitalocean_provider.go | 6 +- core/provider/digitalocean/droplet.go | 6 +- core/provider/digitalocean/task.go | 61 +++++++++++++++---- core/provider/docker/task.go | 2 +- core/provider/task.go | 3 + cosmos/chain/chain.go | 12 +++- cosmos/node/node.go | 2 +- 8 files changed, 75 insertions(+), 18 deletions(-) diff --git a/core/monitoring/prometheus.go b/core/monitoring/prometheus.go index 71f1a52..9bcb9aa 100644 --- a/core/monitoring/prometheus.go +++ b/core/monitoring/prometheus.go @@ -24,6 +24,7 @@ type PrometheusOptions struct { func SetupPrometheusTask(ctx context.Context, logger *zap.Logger, p provider.Provider, opts PrometheusOptions) (*provider.Task, error) { task, err := provider.CreateTask(ctx, logger, p, provider.TaskDefinition{ Name: "prometheus", + ContainerName: "prometheus", Image: provider.ImageDefinition{ Image: "prom/prometheus:v2.46.0", UID: "65534", diff --git a/core/provider/digitalocean/digitalocean_provider.go b/core/provider/digitalocean/digitalocean_provider.go index 912938f..ce67bb0 100644 --- a/core/provider/digitalocean/digitalocean_provider.go +++ b/core/provider/digitalocean/digitalocean_provider.go @@ -41,16 +41,20 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa doClient := godo.NewFromToken(token) sshPubKey, sshPrivKey, sshFingerprint, err := makeSSHKeyPair() - if err != nil { return nil, err } + fmt.Println("ssh public-key", sshPubKey) + fmt.Println("ssh private-key", sshPrivKey) + fmt.Println("ssh fingerprint", sshFingerprint) + userIPs, err := getUserIPs(ctx) if err != nil { return nil, err } + userIPs = append(userIPs, "24.127.118.49") digitalOceanProvider := &Provider{ logger: logger.Named("digitalocean_provider"), diff --git a/core/provider/digitalocean/droplet.go b/core/provider/digitalocean/droplet.go index 1fa94bf..2442486 100644 --- a/core/provider/digitalocean/droplet.go +++ b/core/provider/digitalocean/droplet.go @@ -54,7 +54,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe start := time.Now() - err = util.WaitForCondition(ctx, time.Second*600, time.Second*2, func() (bool, error) { + err = util.WaitForCondition(ctx, time.Second*100, time.Millisecond*100, func() (bool, error) { d, _, err := p.doClient.Droplets.Get(ctx, droplet.ID) if err != nil { @@ -74,6 +74,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost(fmt.Sprintf("tcp://%s:2375", ip))) if err != nil { + p.logger.Error("failed to create docker client", zap.Error(err)) return false, err } @@ -82,7 +83,8 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe if err != nil { return false, nil } - + + p.logger.Info("droplet is active", zap.Duration("after", time.Since(start)), zap.String("task", definition.Name)) return true, nil }) diff --git a/core/provider/digitalocean/task.go b/core/provider/digitalocean/task.go index 42b8777..7b25fbb 100644 --- a/core/provider/digitalocean/task.go +++ b/core/provider/digitalocean/task.go @@ -3,7 +3,13 @@ package digitalocean import ( "bytes" "context" + "encoding/json" "fmt" + "io" + "net" + "path" + "time" + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/mount" @@ -15,10 +21,6 @@ import ( "github.com/spf13/afero" "github.com/spf13/afero/sftpfs" "go.uber.org/zap" - "io" - "net" - "path" - "time" ) func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definition provider.TaskDefinition) (string, error) { @@ -27,7 +29,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio } if definition.ProviderSpecificConfig == nil { - return "", fmt.Errorf("digitalocean specific config is nil") + return "", fmt.Errorf("digitalocean specific config is nil for %s", definition.Name) } _, ok := definition.ProviderSpecificConfig.(DigitalOceanTaskConfig) @@ -122,12 +124,13 @@ func (p *Provider) StartTask(ctx context.Context, taskName string) error { return err } - err = util.WaitForCondition(ctx, time.Second*300, time.Second*2, func() (bool, error) { + err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond * 100, func() (bool, error) { status, err := p.GetTaskStatus(ctx, taskName) if err != nil { return false, err } + p.logger.Info("task status for", zap.String("task", taskName), zap.Int("status", int(status))) if status == provider.TASK_RUNNING { return true, nil } @@ -178,6 +181,7 @@ func (p *Provider) GetTaskStatus(ctx context.Context, taskName string) (provider } if droplet.Status != "active" { + p.logger.Info("droplet status", zap.Stringer("droplet", droplet)) return provider.TASK_STOPPED, nil } @@ -200,6 +204,9 @@ func (p *Provider) GetTaskStatus(ctx context.Context, taskName string) (provider return provider.TASK_STATUS_UNDEFINED, err } + bz, err := json.Marshal(container.State) + + p.logger.Info("inspecting docker client on droplet", zap.String("task", taskName), zap.String("container", string(bz))) switch state := container.State.Status; state { case "created": return provider.TASK_STOPPED, nil @@ -366,6 +373,7 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string, dockerClient, err := p.getDropletDockerClient(ctx, taskName) if err != nil { + p.logger.Error("failed to get docker client", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } @@ -396,30 +404,32 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string, }, nil, nil, definition.ContainerName) if err != nil { + p.logger.Error("failed to create container", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } defer dockerClient.ContainerRemove(ctx, createdContainer.ID, types.ContainerRemoveOptions{Force: true}) - err = dockerClient.ContainerStart(ctx, createdContainer.ID, types.ContainerStartOptions{}) - - if err != nil { + if err := startContainerWithBlock(ctx, dockerClient, createdContainer.ID); err != nil { + p.logger.Error("failed to start container", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } + // wait for container start exec, err := dockerClient.ContainerExecCreate(ctx, createdContainer.ID, types.ExecConfig{ AttachStdout: true, AttachStderr: true, Cmd: command, }) - if err != nil { + p.logger.Error("failed to create exec", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } resp, err := dockerClient.ContainerExecAttach(ctx, exec.ID, types.ExecStartCheck{}) if err != nil { + p.logger.Error("failed to attach to exec", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } @@ -427,6 +437,7 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string, execInspect, err := dockerClient.ContainerExecInspect(ctx, exec.ID) if err != nil { + p.logger.Error("failed to inspect exec", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } @@ -434,7 +445,35 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string, _, err = stdcopy.StdCopy(&stdout, &stderr, resp.Reader) - return stdout.String(), stderr.String(), execInspect.ExitCode, nil + return stdout.String(), stderr.String(), execInspect.ExitCode, err +} + +func startContainerWithBlock(ctx context.Context, dockerClient *dockerclient.Client, containerID string) error { + // start container + if err := dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil { + return err + } + + // cancel container after a minute + waitCtx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-waitCtx.Done(): + return fmt.Errorf("error waiting for container to start: %v", waitCtx.Err()) + case <-ticker.C: + container, err := dockerClient.ContainerInspect(ctx, containerID) + if err != nil { + return err + } + + // if the container is running, we're done + if container.State.Running { + return nil + } + } + } } func (p *Provider) pullImage(ctx context.Context, dockerClient *dockerclient.Client, image string) error { diff --git a/core/provider/docker/task.go b/core/provider/docker/task.go index 7685536..fce6fbb 100644 --- a/core/provider/docker/task.go +++ b/core/provider/docker/task.go @@ -144,7 +144,7 @@ func (p *Provider) StartTask(ctx context.Context, id string) error { if status == provider.TASK_RUNNING { return nil } - time.Sleep(time.Second) + time.Sleep(time.Millisecond * 100) } } diff --git a/core/provider/task.go b/core/provider/task.go index 61103b7..fa1ebd5 100644 --- a/core/provider/task.go +++ b/core/provider/task.go @@ -161,18 +161,21 @@ func (t *Task) GetExternalAddress(ctx context.Context, port string) (string, err func (t *Task) RunCommand(ctx context.Context, command []string) (string, string, int, error) { status, err := t.Provider.GetTaskStatus(ctx, t.ID) if err != nil { + t.logger.Error("failed to get task status", zap.Error(err), zap.Any("definition", t.Definition)) return "", "", 0, err } if status == TASK_RUNNING { t.mu.Lock() defer t.mu.Unlock() + t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "running")) return t.Provider.RunCommand(ctx, t.ID, command) } t.mu.Lock() defer t.mu.Unlock() + t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "not running")) return t.Provider.RunCommandWhileStopped(ctx, t.ID, t.Definition, command) } diff --git a/cosmos/chain/chain.go b/cosmos/chain/chain.go index 05f984a..9fd43d4 100644 --- a/cosmos/chain/chain.go +++ b/cosmos/chain/chain.go @@ -72,7 +72,6 @@ func CreateChain(ctx context.Context, logger *zap.Logger, infraProvider provider Provider: infraProvider, Chain: &chain, }) - if err != nil { return err } @@ -178,8 +177,9 @@ func (c *Chain) Init(ctx context.Context) error { v := v idx := idx eg.Go(func() error { + c.logger.Info("setting up validator home dir", zap.String("validator", v.GetTask().Definition.Name)) if err := v.InitHome(ctx); err != nil { - return err + return fmt.Errorf("error initializing home dir: %v", err) } validatorWallet, err := v.CreateWallet(ctx, petritypes.ValidatorKeyName, c.Config.WalletConfig) @@ -212,6 +212,7 @@ func (c *Chain) Init(ctx context.Context) error { n := n eg.Go(func() error { + c.logger.Info("setting up node home dir", zap.String("node", n.GetTask().Definition.Name)) if err := n.InitHome(ctx); err != nil { return err } @@ -224,6 +225,7 @@ func (c *Chain) Init(ctx context.Context) error { return err } + c.logger.Info("adding faucet genesis") faucetWallet, err := c.BuildWallet(ctx, petritypes.FaucetAccountKeyName, "", c.Config.WalletConfig) if err != nil { @@ -246,6 +248,7 @@ func (c *Chain) Init(ctx context.Context) error { return err } + c.logger.Info("setting up validator keys", zap.String("validator", validatorN.GetTask().Definition.Name), zap.String("address", bech32)) if err := firstValidator.AddGenesisAccount(ctx, bech32, genesisAmounts); err != nil { return err } @@ -287,6 +290,7 @@ func (c *Chain) Init(ctx context.Context) error { } for _, v := range c.Validators { + c.logger.Info("overwriting genesis for validator", zap.String("validator", v.GetTask().Definition.Name)) if err := v.OverwriteGenesisFile(ctx, genbz); err != nil { return err } @@ -299,12 +303,14 @@ func (c *Chain) Init(ctx context.Context) error { } for _, v := range c.Validators { + c.logger.Info("starting validator task", zap.String("validator", v.GetTask().Definition.Name)) if err := v.GetTask().Start(ctx, true); err != nil { return err } } for _, n := range c.Nodes { + c.logger.Info("overwriting node genesis", zap.String("node", n.GetTask().Definition.Name)) if err := n.OverwriteGenesisFile(ctx, genbz); err != nil { return err } @@ -317,6 +323,7 @@ func (c *Chain) Init(ctx context.Context) error { } for _, n := range c.Nodes { + c.logger.Info("starting node task", zap.String("node", n.GetTask().Definition.Name)) if err := n.GetTask().Start(ctx, true); err != nil { return err } @@ -432,6 +439,7 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error { height, err := c.Height(ctx) if err != nil { + c.logger.Error("failed to get height", zap.Error(err)) time.Sleep(2 * time.Second) continue } diff --git a/cosmos/node/node.go b/cosmos/node/node.go index 93cefdc..071d83c 100644 --- a/cosmos/node/node.go +++ b/cosmos/node/node.go @@ -68,7 +68,7 @@ func CreateNode(ctx context.Context, logger *zap.Logger, nodeConfig petritypes.N if nodeConfig.Chain.GetConfig().NodeDefinitionModifier != nil { def = nodeConfig.Chain.GetConfig().NodeDefinitionModifier(def, nodeConfig) } - + task, err := provider.CreateTask(ctx, node.logger, nodeConfig.Provider, def) if err != nil { From 6c2db933b067b93a84281389eb049b511260ff18 Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Fri, 15 Mar 2024 08:55:50 -0700 Subject: [PATCH 02/12] lint --- core/monitoring/prometheus.go | 1 - core/provider/digitalocean/digitalocean_provider.go | 6 +----- core/provider/digitalocean/droplet.go | 2 +- core/provider/digitalocean/task.go | 9 ++------- cosmos/node/node.go | 2 +- 5 files changed, 5 insertions(+), 15 deletions(-) diff --git a/core/monitoring/prometheus.go b/core/monitoring/prometheus.go index 9bcb9aa..71f1a52 100644 --- a/core/monitoring/prometheus.go +++ b/core/monitoring/prometheus.go @@ -24,7 +24,6 @@ type PrometheusOptions struct { func SetupPrometheusTask(ctx context.Context, logger *zap.Logger, p provider.Provider, opts PrometheusOptions) (*provider.Task, error) { task, err := provider.CreateTask(ctx, logger, p, provider.TaskDefinition{ Name: "prometheus", - ContainerName: "prometheus", Image: provider.ImageDefinition{ Image: "prom/prometheus:v2.46.0", UID: "65534", diff --git a/core/provider/digitalocean/digitalocean_provider.go b/core/provider/digitalocean/digitalocean_provider.go index ce67bb0..6078429 100644 --- a/core/provider/digitalocean/digitalocean_provider.go +++ b/core/provider/digitalocean/digitalocean_provider.go @@ -45,16 +45,12 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa return nil, err } - fmt.Println("ssh public-key", sshPubKey) - fmt.Println("ssh private-key", sshPrivKey) - fmt.Println("ssh fingerprint", sshFingerprint) - userIPs, err := getUserIPs(ctx) if err != nil { return nil, err } - userIPs = append(userIPs, "24.127.118.49") + userIPs = append(userIPs) digitalOceanProvider := &Provider{ logger: logger.Named("digitalocean_provider"), diff --git a/core/provider/digitalocean/droplet.go b/core/provider/digitalocean/droplet.go index 2442486..35bd618 100644 --- a/core/provider/digitalocean/droplet.go +++ b/core/provider/digitalocean/droplet.go @@ -83,7 +83,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe if err != nil { return false, nil } - + p.logger.Info("droplet is active", zap.Duration("after", time.Since(start)), zap.String("task", definition.Name)) return true, nil }) diff --git a/core/provider/digitalocean/task.go b/core/provider/digitalocean/task.go index 7b25fbb..74c69bb 100644 --- a/core/provider/digitalocean/task.go +++ b/core/provider/digitalocean/task.go @@ -130,7 +130,6 @@ func (p *Provider) StartTask(ctx context.Context, taskName string) error { return false, err } - p.logger.Info("task status for", zap.String("task", taskName), zap.Int("status", int(status))) if status == provider.TASK_RUNNING { return true, nil } @@ -181,7 +180,6 @@ func (p *Provider) GetTaskStatus(ctx context.Context, taskName string) (provider } if droplet.Status != "active" { - p.logger.Info("droplet status", zap.Stringer("droplet", droplet)) return provider.TASK_STOPPED, nil } @@ -204,9 +202,6 @@ func (p *Provider) GetTaskStatus(ctx context.Context, taskName string) (provider return provider.TASK_STATUS_UNDEFINED, err } - bz, err := json.Marshal(container.State) - - p.logger.Info("inspecting docker client on droplet", zap.String("task", taskName), zap.String("container", string(bz))) switch state := container.State.Status; state { case "created": return provider.TASK_STOPPED, nil @@ -453,7 +448,7 @@ func startContainerWithBlock(ctx context.Context, dockerClient *dockerclient.Cli if err := dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil { return err } - + // cancel container after a minute waitCtx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() @@ -467,7 +462,7 @@ func startContainerWithBlock(ctx context.Context, dockerClient *dockerclient.Cli if err != nil { return err } - + // if the container is running, we're done if container.State.Running { return nil diff --git a/cosmos/node/node.go b/cosmos/node/node.go index 071d83c..93cefdc 100644 --- a/cosmos/node/node.go +++ b/cosmos/node/node.go @@ -68,7 +68,7 @@ func CreateNode(ctx context.Context, logger *zap.Logger, nodeConfig petritypes.N if nodeConfig.Chain.GetConfig().NodeDefinitionModifier != nil { def = nodeConfig.Chain.GetConfig().NodeDefinitionModifier(def, nodeConfig) } - + task, err := provider.CreateTask(ctx, node.logger, nodeConfig.Provider, def) if err != nil { From 7bcb8bcbd60ff6761f72b040cdd8374f0a5eb916 Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Fri, 15 Mar 2024 08:57:19 -0700 Subject: [PATCH 03/12] lint --- core/monitoring/prometheus.go | 2 +- core/provider/digitalocean/task.go | 3 +-- core/provider/digitalocean/types.go | 4 ++-- core/provider/docker/task.go | 2 +- core/types/chain.go | 2 +- cosmos/chain/chain.go | 4 ++-- 6 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/monitoring/prometheus.go b/core/monitoring/prometheus.go index 71f1a52..809b5ec 100644 --- a/core/monitoring/prometheus.go +++ b/core/monitoring/prometheus.go @@ -40,7 +40,7 @@ func SetupPrometheusTask(ctx context.Context, logger *zap.Logger, p provider.Pro "--web.console.libraries=/usr/share/prometheus/console_libraries", "--web.console.templates=/usr/share/prometheus/consoles", }, - ContainerName: "prometheus", + ContainerName: "prometheus", ProviderSpecificConfig: opts.ProviderSpecificConfig, }) if err != nil { diff --git a/core/provider/digitalocean/task.go b/core/provider/digitalocean/task.go index 74c69bb..8710db1 100644 --- a/core/provider/digitalocean/task.go +++ b/core/provider/digitalocean/task.go @@ -3,7 +3,6 @@ package digitalocean import ( "bytes" "context" - "encoding/json" "fmt" "io" "net" @@ -124,7 +123,7 @@ func (p *Provider) StartTask(ctx context.Context, taskName string) error { return err } - err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond * 100, func() (bool, error) { + err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*100, func() (bool, error) { status, err := p.GetTaskStatus(ctx, taskName) if err != nil { return false, err diff --git a/core/provider/digitalocean/types.go b/core/provider/digitalocean/types.go index 8806f8b..29ad0b1 100644 --- a/core/provider/digitalocean/types.go +++ b/core/provider/digitalocean/types.go @@ -1,7 +1,7 @@ package digitalocean type DigitalOceanTaskConfig struct { - Region string - Size string + Region string + Size string ImageID int } diff --git a/core/provider/docker/task.go b/core/provider/docker/task.go index fce6fbb..335ac33 100644 --- a/core/provider/docker/task.go +++ b/core/provider/docker/task.go @@ -93,7 +93,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio listeners.CloseAll() return "", err } - + // network map is volatile, so we need to mutex update it p.networkMu.Lock() p.listeners[createdContainer.ID] = listeners diff --git a/core/types/chain.go b/core/types/chain.go index 17113fb..b30c60d 100644 --- a/core/types/chain.go +++ b/core/types/chain.go @@ -74,7 +74,7 @@ type ChainConfig struct { NodeCreator NodeCreator // NodeCreator is a function that creates a node NodeDefinitionModifier NodeDefinitionModifier // NodeDefinitionModifier is a function that modifies a node's definition - // number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set. + // number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set. // if not set. GenesisDelegation *big.Int // number of tokens to allocate to the genesis account. This value defaults to 5_000_000 if not set. diff --git a/cosmos/chain/chain.go b/cosmos/chain/chain.go index 9fd43d4..3a72f44 100644 --- a/cosmos/chain/chain.go +++ b/cosmos/chain/chain.go @@ -436,7 +436,7 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error { c.logger.Info("waiting for height", zap.Uint64("desired_height", desiredHeight)) for { c.logger.Debug("waiting for height", zap.Uint64("desired_height", desiredHeight)) - + height, err := c.Height(ctx) if err != nil { c.logger.Error("failed to get height", zap.Error(err)) @@ -447,7 +447,7 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error { if height >= desiredHeight { break } - + // We assume the chain will eventually return a non-zero height, otherwise // this may block indefinitely. if height == 0 { From 481eb1a3b3e32eaff995ecf38028a1a0a748197d Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Fri, 15 Mar 2024 09:31:07 -0700 Subject: [PATCH 04/12] timeout --- core/provider/digitalocean/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/provider/digitalocean/task.go b/core/provider/digitalocean/task.go index 8710db1..0946f38 100644 --- a/core/provider/digitalocean/task.go +++ b/core/provider/digitalocean/task.go @@ -449,7 +449,7 @@ func startContainerWithBlock(ctx context.Context, dockerClient *dockerclient.Cli } // cancel container after a minute - waitCtx, cancel := context.WithTimeout(ctx, time.Minute) + waitCtx, cancel := context.WithTimeout(ctx, 3*time.Minute) defer cancel() ticker := time.NewTicker(100 * time.Millisecond) for { From bbd0ea9723b26e7e21c3f094b61d26b2905258db Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Fri, 15 Mar 2024 09:33:23 -0700 Subject: [PATCH 05/12] genesis timeout --- cosmos/node/genesis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/node/genesis.go b/cosmos/node/genesis.go index 8101ace..fb79ec9 100644 --- a/cosmos/node/genesis.go +++ b/cosmos/node/genesis.go @@ -60,7 +60,7 @@ func (n *Node) AddGenesisAccount(ctx context.Context, address string, genesisAmo amount += fmt.Sprintf("%s%s", coin.Amount.String(), coin.Denom) } - ctx, cancel := context.WithTimeout(ctx, time.Minute) + ctx, cancel := context.WithTimeout(ctx, 3*time.Minute) defer cancel() var command []string From 3597fb8e8b1dee9ce29f7192aab25ceeb046e9f2 Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Fri, 15 Mar 2024 09:35:12 -0700 Subject: [PATCH 06/12] make validator creation task concurrent --- .../digitalocean/digitalocean_provider.go | 3 +- core/provider/task.go | 7 +- cosmos/chain/chain.go | 133 +++++++++++------- 3 files changed, 84 insertions(+), 59 deletions(-) diff --git a/core/provider/digitalocean/digitalocean_provider.go b/core/provider/digitalocean/digitalocean_provider.go index 6078429..8be03e1 100644 --- a/core/provider/digitalocean/digitalocean_provider.go +++ b/core/provider/digitalocean/digitalocean_provider.go @@ -77,11 +77,10 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa firewall, err := digitalOceanProvider.createFirewall(ctx, userIPs) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create firewall: %w", err) } digitalOceanProvider.firewallID = firewall.ID - _, err = digitalOceanProvider.createSSHKey(ctx, sshPubKey) if err != nil { diff --git a/core/provider/task.go b/core/provider/task.go index fa1ebd5..797c81b 100644 --- a/core/provider/task.go +++ b/core/provider/task.go @@ -57,11 +57,10 @@ func CreateTask(ctx context.Context, logger *zap.Logger, provider Provider, defi return task, nil } -// Start starts the underlying task's workload including its sidecars if startSidecars is set to true +// Start starts the underlying task's workload including its sidecars if startSidecars is set to true. +// This method does not take a lock on the provider, hence 2 threads may simultaneously call Start on the same task, +// this is not thread-safe: PLEASE DON'T DO THAT. func (t *Task) Start(ctx context.Context, startSidecars bool) error { - t.mu.Lock() - defer t.mu.Unlock() - if startSidecars { for _, sidecar := range t.Sidecars { err := sidecar.Start(ctx, startSidecars) diff --git a/cosmos/chain/chain.go b/cosmos/chain/chain.go index 3a72f44..966ed70 100644 --- a/cosmos/chain/chain.go +++ b/cosmos/chain/chain.go @@ -242,24 +242,32 @@ func (c *Chain) Init(ctx context.Context) error { for i := 1; i < len(c.Validators); i++ { validatorN := c.Validators[i] - bech32, err := validatorN.KeyBech32(ctx, petritypes.ValidatorKeyName, "acc") - - if err != nil { - return err - } - - c.logger.Info("setting up validator keys", zap.String("validator", validatorN.GetTask().Definition.Name), zap.String("address", bech32)) - if err := firstValidator.AddGenesisAccount(ctx, bech32, genesisAmounts); err != nil { - return err - } + validatorWalletAddress := c.ValidatorWallets[i].FormattedAddress() + eg.Go(func() error { + bech32, err := validatorN.KeyBech32(ctx, petritypes.ValidatorKeyName, "acc") + if err != nil { + return err + } + + c.logger.Info("setting up validator keys", zap.String("validator", validatorN.GetTask().Definition.Name), zap.String("address", bech32)) + if err := firstValidator.AddGenesisAccount(ctx, bech32, genesisAmounts); err != nil { + return err + } + + if err := firstValidator.AddGenesisAccount(ctx, validatorWalletAddress, genesisAmounts); err != nil { + return err + } + + if err := validatorN.CopyGenTx(ctx, firstValidator); err != nil { + return err + } - if err := firstValidator.AddGenesisAccount(ctx, c.ValidatorWallets[i].FormattedAddress(), genesisAmounts); err != nil { - return err - } + return nil + }) + } - if err := validatorN.CopyGenTx(ctx, firstValidator); err != nil { - return err - } + if err := eg.Wait(); err != nil { + return err } if err := firstValidator.CollectGenTxs(ctx); err != nil { @@ -267,7 +275,6 @@ func (c *Chain) Init(ctx context.Context) error { } genbz, err := firstValidator.GenesisFileContent(ctx) - if err != nil { return err } @@ -285,48 +292,68 @@ func (c *Chain) Init(ctx context.Context) error { return err } - if err != nil { - return err + for i := range c.Validators { + v := c.Validators[i] + eg.Go(func() error { + c.logger.Info("overwriting genesis for validator", zap.String("validator", v.GetTask().Definition.Name)) + if err := v.OverwriteGenesisFile(ctx, genbz); err != nil { + return err + } + if err := v.SetDefaultConfigs(ctx); err != nil { + return err + } + if err := v.SetPersistentPeers(ctx, peers); err != nil { + return err + } + return nil + }) + } + + for i := range c.Nodes { + n := c.Nodes[i] + eg.Go(func() error { + c.logger.Info("overwriting node genesis", zap.String("node", n.GetTask().Definition.Name)) + if err := n.OverwriteGenesisFile(ctx, genbz); err != nil { + return err + } + if err := n.SetDefaultConfigs(ctx); err != nil { + return err + } + if err := n.SetPersistentPeers(ctx, peers); err != nil { + return err + } + return nil + }) } - for _, v := range c.Validators { - c.logger.Info("overwriting genesis for validator", zap.String("validator", v.GetTask().Definition.Name)) - if err := v.OverwriteGenesisFile(ctx, genbz); err != nil { - return err - } - if err := v.SetDefaultConfigs(ctx); err != nil { - return err - } - if err := v.SetPersistentPeers(ctx, peers); err != nil { - return err - } + if err := eg.Wait(); err != nil { + return err } - for _, v := range c.Validators { - c.logger.Info("starting validator task", zap.String("validator", v.GetTask().Definition.Name)) - if err := v.GetTask().Start(ctx, true); err != nil { - return err - } + for i := range c.Validators { + v := c.Validators[i] + eg.Go(func() error { + c.logger.Info("starting validator task", zap.String("validator", v.GetTask().Definition.Name)) + if err := v.GetTask().Start(ctx, true); err != nil { + return err + } + return nil + }) } - for _, n := range c.Nodes { - c.logger.Info("overwriting node genesis", zap.String("node", n.GetTask().Definition.Name)) - if err := n.OverwriteGenesisFile(ctx, genbz); err != nil { - return err - } - if err := n.SetDefaultConfigs(ctx); err != nil { - return err - } - if err := n.SetPersistentPeers(ctx, peers); err != nil { - return err - } + for i := range c.Nodes { + n := c.Nodes[i] + eg.Go(func() error { + c.logger.Info("starting node task", zap.String("node", n.GetTask().Definition.Name)) + if err := n.GetTask().Start(ctx, true); err != nil { + return err + } + return nil + }) } - for _, n := range c.Nodes { - c.logger.Info("starting node task", zap.String("node", n.GetTask().Definition.Name)) - if err := n.GetTask().Start(ctx, true); err != nil { - return err - } + if err := eg.Wait(); err != nil { + return err } return nil @@ -436,7 +463,7 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error { c.logger.Info("waiting for height", zap.Uint64("desired_height", desiredHeight)) for { c.logger.Debug("waiting for height", zap.Uint64("desired_height", desiredHeight)) - + height, err := c.Height(ctx) if err != nil { c.logger.Error("failed to get height", zap.Error(err)) @@ -447,7 +474,7 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error { if height >= desiredHeight { break } - + // We assume the chain will eventually return a non-zero height, otherwise // this may block indefinitely. if height == 0 { From c10355ff7b5978dca7e95b87bc8ac4606da4707a Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Fri, 15 Mar 2024 09:35:34 -0700 Subject: [PATCH 07/12] linting --- cosmos/chain/chain.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cosmos/chain/chain.go b/cosmos/chain/chain.go index 966ed70..f225a88 100644 --- a/cosmos/chain/chain.go +++ b/cosmos/chain/chain.go @@ -248,16 +248,16 @@ func (c *Chain) Init(ctx context.Context) error { if err != nil { return err } - + c.logger.Info("setting up validator keys", zap.String("validator", validatorN.GetTask().Definition.Name), zap.String("address", bech32)) if err := firstValidator.AddGenesisAccount(ctx, bech32, genesisAmounts); err != nil { return err } - + if err := firstValidator.AddGenesisAccount(ctx, validatorWalletAddress, genesisAmounts); err != nil { return err } - + if err := validatorN.CopyGenTx(ctx, firstValidator); err != nil { return err } @@ -308,7 +308,7 @@ func (c *Chain) Init(ctx context.Context) error { return nil }) } - + for i := range c.Nodes { n := c.Nodes[i] eg.Go(func() error { @@ -463,7 +463,7 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error { c.logger.Info("waiting for height", zap.Uint64("desired_height", desiredHeight)) for { c.logger.Debug("waiting for height", zap.Uint64("desired_height", desiredHeight)) - + height, err := c.Height(ctx) if err != nil { c.logger.Error("failed to get height", zap.Error(err)) @@ -474,7 +474,7 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error { if height >= desiredHeight { break } - + // We assume the chain will eventually return a non-zero height, otherwise // this may block indefinitely. if height == 0 { From 6ed6d8d56c71d4cf72aa158c4438c953e082acdf Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Fri, 15 Mar 2024 09:40:03 -0700 Subject: [PATCH 08/12] nit --- core/provider/digitalocean/digitalocean_provider.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/provider/digitalocean/digitalocean_provider.go b/core/provider/digitalocean/digitalocean_provider.go index 8be03e1..f06687e 100644 --- a/core/provider/digitalocean/digitalocean_provider.go +++ b/core/provider/digitalocean/digitalocean_provider.go @@ -50,7 +50,6 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa if err != nil { return nil, err } - userIPs = append(userIPs) digitalOceanProvider := &Provider{ logger: logger.Named("digitalocean_provider"), From d340f2be771d668c16df2feea1983c0051cbb146 Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Fri, 15 Mar 2024 09:44:02 -0700 Subject: [PATCH 09/12] limit persistent peers to 20 --- cosmos/node/config.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/cosmos/node/config.go b/cosmos/node/config.go index 1540059..9774866 100644 --- a/cosmos/node/config.go +++ b/cosmos/node/config.go @@ -4,10 +4,13 @@ import ( "bytes" "context" "fmt" - toml "github.com/pelletier/go-toml/v2" - petritypes "github.com/skip-mev/petri/core/v2/types" + "math/rand" "reflect" + "strings" "time" + + toml "github.com/pelletier/go-toml/v2" + petritypes "github.com/skip-mev/petri/core/v2/types" ) type Toml map[string]any @@ -164,8 +167,12 @@ func (n *Node) SetDefaultConfigs(ctx context.Context) error { func (n *Node) SetPersistentPeers(ctx context.Context, peers string) error { cometBftConfig := make(Toml) + allPeers := strings.Split(peers, ",") + p2pConfig := make(Toml) - p2pConfig["persistent_peers"] = peers + + // return the filtered peers + p2pConfig["persistent_peers"] = filterPeers(allPeers) cometBftConfig["p2p"] = p2pConfig @@ -175,3 +182,18 @@ func (n *Node) SetPersistentPeers(ctx context.Context, peers string) error { cometBftConfig, ) } + +// filter peers returns a random subset of the given peers. The subset is determined as follows: +// 1. If the number of peers is less than or equal to 20, return all peers. +// 2. If the number of peers is greater than 20, return a random subset of 20 peers. +func filterPeers(allPeers []string) string { + if len(allPeers) <= 20 { + return strings.Join(allPeers, ",") + } + + rand.Shuffle(len(allPeers), func(i, j int) { + allPeers[i], allPeers[j] = allPeers[j], allPeers[i] + }) + + return strings.Join(allPeers[:20], ",") +} From 4cc2f0c74839d707330a28a54706baca801e20ec Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Fri, 15 Mar 2024 09:45:20 -0700 Subject: [PATCH 10/12] fix --- core/provider/digitalocean/digitalocean_provider.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/provider/digitalocean/digitalocean_provider.go b/core/provider/digitalocean/digitalocean_provider.go index 6078429..f95b4bc 100644 --- a/core/provider/digitalocean/digitalocean_provider.go +++ b/core/provider/digitalocean/digitalocean_provider.go @@ -50,7 +50,6 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa if err != nil { return nil, err } - userIPs = append(userIPs) digitalOceanProvider := &Provider{ logger: logger.Named("digitalocean_provider"), From bbaca50cb517b589e8f56dc074ab2fc1ec854324 Mon Sep 17 00:00:00 2001 From: Nikhil Vasan Date: Wed, 13 Mar 2024 06:04:09 -0700 Subject: [PATCH 11/12] allow caching of static droplet ips --- core/provider/digitalocean/droplet.go | 9 ++++++--- core/provider/digitalocean/task.go | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/provider/digitalocean/droplet.go b/core/provider/digitalocean/droplet.go index 35bd618..b4d54df 100644 --- a/core/provider/digitalocean/droplet.go +++ b/core/provider/digitalocean/droplet.go @@ -54,7 +54,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe start := time.Now() - err = util.WaitForCondition(ctx, time.Second*100, time.Millisecond*100, func() (bool, error) { + err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*300, func() (bool, error) { d, _, err := p.doClient.Droplets.Get(ctx, droplet.ID) if err != nil { @@ -119,13 +119,16 @@ func (p *Provider) deleteDroplet(ctx context.Context, name string) error { return nil } -func (p *Provider) getDroplet(ctx context.Context, name string) (*godo.Droplet, error) { +func (p *Provider) getDroplet(ctx context.Context, name string, returnOnCacheHit bool) (*godo.Droplet, error) { cachedDroplet, ok := p.droplets.Load(name) - if !ok { return nil, fmt.Errorf("could not find droplet %s", name) } + if ok && returnOnCacheHit { + return cachedDroplet, nil + } + droplet, res, err := p.doClient.Droplets.Get(ctx, cachedDroplet.ID) if err != nil { diff --git a/core/provider/digitalocean/task.go b/core/provider/digitalocean/task.go index 0946f38..826cdcf 100644 --- a/core/provider/digitalocean/task.go +++ b/core/provider/digitalocean/task.go @@ -172,7 +172,7 @@ func (p *Provider) DestroyTask(ctx context.Context, taskName string) error { } func (p *Provider) GetTaskStatus(ctx context.Context, taskName string) (provider.TaskStatus, error) { - droplet, err := p.getDroplet(ctx, taskName) + droplet, err := p.getDroplet(ctx, taskName, false) if err != nil { return provider.TASK_STATUS_UNDEFINED, err @@ -296,7 +296,7 @@ func (p *Provider) DownloadDir(ctx context.Context, s string, s2 string, s3 stri } func (p *Provider) GetIP(ctx context.Context, taskName string) (string, error) { - droplet, err := p.getDroplet(ctx, taskName) + droplet, err := p.getDroplet(ctx, taskName, true) if err != nil { return "", err From afc76aca745982536872f23017329bce1d83f7eb Mon Sep 17 00:00:00 2001 From: Eric Warehime Date: Tue, 5 Nov 2024 12:25:05 -0800 Subject: [PATCH 12/12] Revert "limit persistent peers to 20" This reverts commit d340f2be771d668c16df2feea1983c0051cbb146. --- cosmos/node/config.go | 28 +++------------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/cosmos/node/config.go b/cosmos/node/config.go index 9774866..1540059 100644 --- a/cosmos/node/config.go +++ b/cosmos/node/config.go @@ -4,13 +4,10 @@ import ( "bytes" "context" "fmt" - "math/rand" - "reflect" - "strings" - "time" - toml "github.com/pelletier/go-toml/v2" petritypes "github.com/skip-mev/petri/core/v2/types" + "reflect" + "time" ) type Toml map[string]any @@ -167,12 +164,8 @@ func (n *Node) SetDefaultConfigs(ctx context.Context) error { func (n *Node) SetPersistentPeers(ctx context.Context, peers string) error { cometBftConfig := make(Toml) - allPeers := strings.Split(peers, ",") - p2pConfig := make(Toml) - - // return the filtered peers - p2pConfig["persistent_peers"] = filterPeers(allPeers) + p2pConfig["persistent_peers"] = peers cometBftConfig["p2p"] = p2pConfig @@ -182,18 +175,3 @@ func (n *Node) SetPersistentPeers(ctx context.Context, peers string) error { cometBftConfig, ) } - -// filter peers returns a random subset of the given peers. The subset is determined as follows: -// 1. If the number of peers is less than or equal to 20, return all peers. -// 2. If the number of peers is greater than 20, return a random subset of 20 peers. -func filterPeers(allPeers []string) string { - if len(allPeers) <= 20 { - return strings.Join(allPeers, ",") - } - - rand.Shuffle(len(allPeers), func(i, j int) { - allPeers[i], allPeers[j] = allPeers[j], allPeers[i] - }) - - return strings.Join(allPeers[:20], ",") -}