diff --git a/cmd/root.go b/cmd/root.go index 5ef67fa7ba..378369cf68 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -19,6 +19,7 @@ import ( "github.com/fatih/color" "github.com/pingcap/tiup/pkg/environment" + "github.com/pingcap/tiup/pkg/exec" "github.com/pingcap/tiup/pkg/localdata" "github.com/pingcap/tiup/pkg/repository" "github.com/pingcap/tiup/pkg/version" @@ -98,7 +99,7 @@ the latest stable version will be downloaded from the repository.`, break } } - return runComponent(env, tag, componentSpec, binPath, transparentParams) + return exec.RunComponent(env, tag, componentSpec, binPath, transparentParams) } return cmd.Help() }, diff --git a/cmd/telemetry.go b/cmd/telemetry.go index e29f419c4b..6c15d55806 100644 --- a/cmd/telemetry.go +++ b/cmd/telemetry.go @@ -2,29 +2,12 @@ package cmd import ( "fmt" - "os" - "path/filepath" "github.com/pingcap/tiup/pkg/environment" - "github.com/pingcap/tiup/pkg/localdata" "github.com/pingcap/tiup/pkg/telemetry" "github.com/spf13/cobra" ) -const telemetryFname = "meta.yaml" - -func getTelemetryMeta(env *environment.Environment) (meta *telemetry.Meta, fname string, err error) { - dir := env.Profile().Path(localdata.TelemetryDir) - err = os.MkdirAll(dir, 0755) - if err != nil { - return - } - - fname = filepath.Join(dir, telemetryFname) - meta, err = telemetry.LoadFrom(fname) - return -} - func newTelemetryCmd() *cobra.Command { cmd := &cobra.Command{ Use: "telemetry", @@ -36,7 +19,7 @@ func newTelemetryCmd() *cobra.Command { Short: "Reset the uuid used for telemetry", RunE: func(cmd *cobra.Command, args []string) error { env := environment.GlobalEnv() - teleMeta, fname, err := getTelemetryMeta(env) + teleMeta, fname, err := telemetry.GetMeta(env) if err != nil { return err } @@ -57,7 +40,7 @@ func newTelemetryCmd() *cobra.Command { Short: "Enable telemetry of tiup", RunE: func(cmd *cobra.Command, args []string) error { env := environment.GlobalEnv() - teleMeta, fname, err := getTelemetryMeta(env) + teleMeta, fname, err := telemetry.GetMeta(env) if err != nil { return err } @@ -78,7 +61,7 @@ func newTelemetryCmd() *cobra.Command { Short: "Disable telemetry of tiup", RunE: func(cmd *cobra.Command, args []string) error { env := environment.GlobalEnv() - teleMeta, fname, err := getTelemetryMeta(env) + teleMeta, fname, err := telemetry.GetMeta(env) if err != nil { return err } @@ -99,7 +82,7 @@ func newTelemetryCmd() *cobra.Command { Short: "Display the current status of tiup telemetry", RunE: func(cmd *cobra.Command, args []string) error { env := environment.GlobalEnv() - teleMeta, _, err := getTelemetryMeta(env) + teleMeta, _, err := telemetry.GetMeta(env) if err != nil { return err } diff --git a/components/playground/grafana.go b/components/playground/grafana.go index 196ff05578..b973034a28 100644 --- a/components/playground/grafana.go +++ b/components/playground/grafana.go @@ -19,13 +19,14 @@ import ( "io/ioutil" "os" "os/exec" + "path" "path/filepath" "regexp" "strings" "github.com/pingcap/errors" - "github.com/pingcap/tiup/components/playground/instance" - "github.com/pingcap/tiup/pkg/localdata" + "github.com/pingcap/tiup/pkg/environment" + tiupexec "github.com/pingcap/tiup/pkg/exec" "github.com/pingcap/tiup/pkg/repository/v0manifest" "github.com/pingcap/tiup/pkg/utils" ) @@ -188,24 +189,22 @@ http_port = %d } args := []string{ - "tiup", - instance.CompVersion("grafana", v0manifest.Version(g.version)), "--homepath", dir, "--config", customeFName, + fmt.Sprintf("cfg:default.paths.logs=%s", path.Join(dir, "log")), } - cmd := exec.CommandContext(ctx, args[0], args[1:]...) - cmd.Env = append( - os.Environ(), - fmt.Sprintf("%s=%s", localdata.EnvNameInstanceDataDir, dir), - ) + env := environment.GlobalEnv() + cmd, err := tiupexec.PrepareCommand(ctx, "grafana", v0manifest.Version(g.version), "", "", dir, args, env) + if err != nil { + return errors.AddStack(err) + } + cmd.Stdout = nil + cmd.Stderr = nil g.cmd = cmd - g.cmd.Stderr = os.Stderr - g.cmd.Stderr = os.Stdout - fmt.Println("start grafana: ", strings.Join(args, " ")) return g.cmd.Start() } diff --git a/components/playground/instance/drainer.go b/components/playground/instance/drainer.go index 3a273bd1f8..1933215997 100644 --- a/components/playground/instance/drainer.go +++ b/components/playground/instance/drainer.go @@ -28,7 +28,7 @@ import ( type Drainer struct { instance pds []*PDInstance - *Process + Process } var _ Instance = &Drainer{} @@ -82,8 +82,6 @@ func (d *Drainer) Start(ctx context.Context, version v0manifest.Version) error { } args := []string{ - "tiup", fmt.Sprintf("--binpath=%s", d.BinPath), - CompVersion("drainer", version), fmt.Sprintf("--node-id=%s", d.NodeID()), fmt.Sprintf("--addr=%s:%d", d.Host, d.Port), fmt.Sprintf("--advertise-addr=%s:%d", advertiseHost(d.Host), d.Port), @@ -94,8 +92,11 @@ func (d *Drainer) Start(ctx context.Context, version v0manifest.Version) error { args = append(args, fmt.Sprintf("--config=%s", d.ConfigPath)) } - d.Process = NewProcess(ctx, d.Dir, args[0], args[1:]...) - logIfErr(d.Process.setOutputFile(d.LogFile())) + var err error + if d.Process, err = NewComponentProcess(ctx, d.Dir, d.BinPath, "drainer", version, args...); err != nil { + return err + } + logIfErr(d.Process.SetOutputFile(d.LogFile())) return d.Process.Start() } diff --git a/components/playground/instance/instance.go b/components/playground/instance/instance.go index 72da8aae80..8fd012fb5c 100644 --- a/components/playground/instance/instance.go +++ b/components/playground/instance/instance.go @@ -52,7 +52,7 @@ type Instance interface { // StatusAddrs return the address to pull metrics. StatusAddrs() []string // Wait Should only call this if the instance is started successfully. - Wait() error + Wait(ctx context.Context) error } func (inst *instance) StatusAddrs() (addrs []string) { diff --git a/components/playground/instance/pd.go b/components/playground/instance/pd.go index 1852a2b158..2f348523b6 100644 --- a/components/playground/instance/pd.go +++ b/components/playground/instance/pd.go @@ -30,7 +30,7 @@ type PDInstance struct { instance initEndpoints []*PDInstance joinEndpoints []*PDInstance - *Process + Process } // NewPDInstance return a PDInstance @@ -72,8 +72,6 @@ func (inst *PDInstance) Start(ctx context.Context, version v0manifest.Version) e } uid := inst.Name() args := []string{ - "tiup", fmt.Sprintf("--binpath=%s", inst.BinPath), - CompVersion("pd", version), "--name=" + uid, fmt.Sprintf("--data-dir=%s", filepath.Join(inst.Dir, "data")), fmt.Sprintf("--peer-urls=http://%s:%d", inst.Host, inst.Port), @@ -103,8 +101,11 @@ func (inst *PDInstance) Start(ctx context.Context, version v0manifest.Version) e return errors.Errorf("must set the init or join instances.") } - inst.Process = NewProcess(ctx, inst.Dir, args[0], args[1:]...) - logIfErr(inst.Process.setOutputFile(inst.LogFile())) + var err error + if inst.Process, err = NewComponentProcess(ctx, inst.Dir, inst.BinPath, "pd", version, args...); err != nil { + return err + } + logIfErr(inst.Process.SetOutputFile(inst.LogFile())) return inst.Process.Start() } diff --git a/components/playground/instance/process.go b/components/playground/instance/process.go index bf31872492..4d6a3359c1 100644 --- a/components/playground/instance/process.go +++ b/components/playground/instance/process.go @@ -2,41 +2,60 @@ package instance import ( "context" - "fmt" "io" "os" "os/exec" "time" "github.com/pingcap/errors" - "github.com/pingcap/tiup/pkg/localdata" + "github.com/pingcap/tiup/pkg/environment" + tiupexec "github.com/pingcap/tiup/pkg/exec" + "github.com/pingcap/tiup/pkg/repository/v0manifest" ) -// Process represent process to be run by playground. -type Process struct { +// ErrorWaitTimeout is used to represent timeout of a command +// Example: +// _ = syscall.Kill(cmd.Process.Pid, syscall.SIGKILL) +// if err := WaitContext(context.WithTimeout(context.Background(), 3), cmd); err == ErrorWaitTimeout { +// // Do something +// } +var ErrorWaitTimeout = errors.New("wait command timeout") + +// Process represent process to be run by playground +type Process interface { + Start() error + Wait(ctx context.Context) error + Pid() int + Uptime() string + SetOutputFile(fname string) error + Cmd() *exec.Cmd +} + +// process implementes Process +type process struct { cmd *exec.Cmd startTime time.Time } // Start the process -func (p *Process) Start() error { +func (p *process) Start() error { // fmt.Printf("Starting `%s`: %s", filepath.Base(p.cmd.Path), strings.Join(p.cmd.Args, " ")) p.startTime = time.Now() return p.cmd.Start() } // Wait implements Instance interface. -func (p *Process) Wait() error { - return p.cmd.Wait() +func (p *process) Wait(ctx context.Context) error { + return WaitContext(ctx, p.cmd) } // Pid implements Instance interface. -func (p *Process) Pid() int { +func (p *process) Pid() int { return p.cmd.Process.Pid } // Uptime implements Instance interface. -func (p *Process) Uptime() string { +func (p *process) Uptime() string { s := p.cmd.ProcessState if s != nil && s.Exited() { return "exited" @@ -46,7 +65,7 @@ func (p *Process) Uptime() string { return duration.String() } -func (p *Process) setOutputFile(fname string) error { +func (p *process) SetOutputFile(fname string) error { f, err := os.OpenFile(fname, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return errors.AddStack(err) @@ -55,27 +74,46 @@ func (p *Process) setOutputFile(fname string) error { return nil } -func (p *Process) setOutput(w io.Writer) { +func (p *process) setOutput(w io.Writer) { p.cmd.Stdout = w p.cmd.Stderr = w } -// NewProcess create a Process instance. -func NewProcess(ctx context.Context, dir string, name string, arg ...string) *Process { +func (p *process) Cmd() *exec.Cmd { + return p.cmd +} + +// NewComponentProcess create a Process instance. +func NewComponentProcess(ctx context.Context, dir, binPath, component string, version v0manifest.Version, arg ...string) (Process, error) { if dir == "" { panic("dir must be set") } - cmd := exec.CommandContext(ctx, name, arg...) - cmd.Env = append( - os.Environ(), - fmt.Sprintf("%s=%s", localdata.EnvNameInstanceDataDir, dir), - ) + env := environment.GlobalEnv() + cmd, err := tiupexec.PrepareCommand(ctx, component, version, binPath, "", dir, arg, env) + if err != nil { + return nil, err + } - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + return &process{cmd: cmd}, nil +} - return &Process{ - cmd: cmd, +// WaitContext wrap cmd.Wait with context +func WaitContext(ctx context.Context, cmd *exec.Cmd) error { + // We use cmd.Process.Wait instead of cmd.Wait because cmd.Wait is not reenterable + c := make(chan error, 1) + go func() { + if cmd == nil || cmd.Process == nil { + c <- nil + } else { + _, err := cmd.Process.Wait() + c <- err + } + }() + select { + case <-ctx.Done(): + return ErrorWaitTimeout + case err := <-c: + return err } } diff --git a/components/playground/instance/pump.go b/components/playground/instance/pump.go index abbeff9147..8e2ac15d6d 100644 --- a/components/playground/instance/pump.go +++ b/components/playground/instance/pump.go @@ -30,7 +30,7 @@ import ( type Pump struct { instance pds []*PDInstance - *Process + Process } var _ Instance = &Pump{} @@ -93,8 +93,6 @@ func (p *Pump) Start(ctx context.Context, version v0manifest.Version) error { } args := []string{ - "tiup", fmt.Sprintf("--binpath=%s", p.BinPath), - CompVersion("pump", version), fmt.Sprintf("--node-id=%s", p.NodeID()), fmt.Sprintf("--addr=%s:%d", p.Host, p.Port), fmt.Sprintf("--advertise-addr=%s:%d", advertiseHost(p.Host), p.Port), @@ -105,8 +103,11 @@ func (p *Pump) Start(ctx context.Context, version v0manifest.Version) error { args = append(args, fmt.Sprintf("--config=%s", p.ConfigPath)) } - p.Process = NewProcess(ctx, p.Dir, args[0], args[1:]...) - logIfErr(p.Process.setOutputFile(p.LogFile())) + var err error + if p.Process, err = NewComponentProcess(ctx, p.Dir, p.BinPath, "pump", version, args...); err != nil { + return err + } + logIfErr(p.Process.SetOutputFile(p.LogFile())) return p.Process.Start() } diff --git a/components/playground/instance/tidb.go b/components/playground/instance/tidb.go index 14ac1f1002..82feb5a661 100644 --- a/components/playground/instance/tidb.go +++ b/components/playground/instance/tidb.go @@ -29,7 +29,7 @@ import ( type TiDBInstance struct { instance pds []*PDInstance - *Process + Process enableBinlog bool } @@ -60,8 +60,6 @@ func (inst *TiDBInstance) Start(ctx context.Context, version v0manifest.Version) endpoints = append(endpoints, fmt.Sprintf("%s:%d", inst.Host, pd.StatusPort)) } args := []string{ - "tiup", fmt.Sprintf("--binpath=%s", inst.BinPath), - CompVersion("tidb", version), "-P", strconv.Itoa(inst.Port), "--store=tikv", fmt.Sprintf("--host=%s", inst.Host), @@ -76,8 +74,11 @@ func (inst *TiDBInstance) Start(ctx context.Context, version v0manifest.Version) args = append(args, "--enable-binlog=true") } - inst.Process = NewProcess(ctx, inst.Dir, args[0], args[1:]...) - logIfErr(inst.Process.setOutputFile(inst.LogFile())) + var err error + if inst.Process, err = NewComponentProcess(ctx, inst.Dir, inst.BinPath, "tidb", version, args...); err != nil { + return err + } + logIfErr(inst.Process.SetOutputFile(inst.LogFile())) return inst.Process.Start() } diff --git a/components/playground/instance/tiflash.go b/components/playground/instance/tiflash.go index e87df2c8cf..aa5f684ef9 100644 --- a/components/playground/instance/tiflash.go +++ b/components/playground/instance/tiflash.go @@ -43,7 +43,7 @@ type TiFlashInstance struct { ProxyConfigPath string pds []*PDInstance dbs []*TiDBInstance - *Process + Process } // NewTiFlashInstance return a TiFlashInstance @@ -178,14 +178,14 @@ func (inst *TiFlashInstance) Start(ctx context.Context, version v0manifest.Versi } args := []string{ - "tiup", fmt.Sprintf("--binpath=%s", inst.BinPath), - CompVersion("tiflash", version), "server", fmt.Sprintf("--config-file=%s", inst.ConfigPath), } - inst.Process = NewProcess(ctx, inst.Dir, args[0], args[1:]...) - logIfErr(inst.Process.setOutputFile(inst.LogFile())) + if inst.Process, err = NewComponentProcess(ctx, inst.Dir, inst.BinPath, "tiflash", version, args...); err != nil { + return err + } + logIfErr(inst.Process.SetOutputFile(inst.LogFile())) return inst.Process.Start() } @@ -202,7 +202,7 @@ func (inst *TiFlashInstance) LogFile() string { // Cmd returns the internal Cmd instance func (inst *TiFlashInstance) Cmd() *exec.Cmd { - return inst.cmd + return inst.Process.Cmd() } // StoreAddr return the store address of TiFlash diff --git a/components/playground/instance/tikv.go b/components/playground/instance/tikv.go index 72948b9851..c10fc4504a 100644 --- a/components/playground/instance/tikv.go +++ b/components/playground/instance/tikv.go @@ -30,7 +30,7 @@ import ( type TiKVInstance struct { instance pds []*PDInstance - *Process + Process } // NewTiKVInstance return a TiKVInstance @@ -67,8 +67,6 @@ func (inst *TiKVInstance) Start(ctx context.Context, version v0manifest.Version) endpoints = append(endpoints, fmt.Sprintf("http://%s:%d", advertiseHost(inst.Host), pd.StatusPort)) } args := []string{ - "tiup", fmt.Sprintf("--binpath=%s", inst.BinPath), - CompVersion("tikv", version), fmt.Sprintf("--addr=%s:%d", inst.Host, inst.Port), fmt.Sprintf("--advertise-addr=%s:%d", advertiseHost(inst.Host), inst.Port), fmt.Sprintf("--status-addr=%s:%d", inst.Host, inst.StatusPort), @@ -78,8 +76,11 @@ func (inst *TiKVInstance) Start(ctx context.Context, version v0manifest.Version) fmt.Sprintf("--log-file=%s", inst.LogFile()), } - inst.Process = NewProcess(ctx, inst.Dir, args[0], args[1:]...) - logIfErr(inst.Process.setOutputFile(inst.LogFile())) + var err error + if inst.Process, err = NewComponentProcess(ctx, inst.Dir, inst.BinPath, "tikv", version, args...); err != nil { + return err + } + logIfErr(inst.Process.SetOutputFile(inst.LogFile())) return inst.Process.Start() } diff --git a/components/playground/main.go b/components/playground/main.go index 4f9b44e44c..e6b218a7a1 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -21,7 +21,6 @@ import ( "net/http" _ "net/http/pprof" "os" - "os/exec" "path/filepath" "strconv" "strings" @@ -32,8 +31,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiup/components/playground/instance" "github.com/pingcap/tiup/pkg/cluster/api" + "github.com/pingcap/tiup/pkg/environment" "github.com/pingcap/tiup/pkg/localdata" - "github.com/pingcap/tiup/pkg/repository/v0manifest" + "github.com/pingcap/tiup/pkg/repository" "github.com/pingcap/tiup/pkg/utils" "github.com/spf13/cobra" "go.etcd.io/etcd/clientv3" @@ -53,33 +53,21 @@ type bootOptions struct { } func installIfMissing(profile *localdata.Profile, component, version string) error { - versions, err := profile.InstalledVersions(component) + env := environment.GlobalEnv() + + installed, err := env.V1Repository().Local().ComponentInstalled(component, version) if err != nil { return err } - if len(versions) > 0 { - if v0manifest.Version(version).IsEmpty() { - return nil - } - found := false - for _, v := range versions { - if v == version { - found = true - break - } - } - if found { - return nil - } + if installed { + return nil } - spec := component - if !v0manifest.Version(version).IsEmpty() { - spec = fmt.Sprintf("%s:%s", component, version) + + spec := repository.ComponentSpec{ + ID: component, + Version: version, } - c := exec.Command("tiup", "install", spec) - c.Stdout = os.Stdout - c.Stderr = os.Stderr - return c.Run() + return env.V1Repository().UpdateComponents([]repository.ComponentSpec{spec}) } func execute() error { @@ -130,7 +118,14 @@ Examples: if err != nil { return errors.AddStack(err) } - return p.bootCluster(opt) + + env, err := environment.InitEnv(repository.Options{}) + if err != nil { + return errors.AddStack(err) + } + environment.SetGlobalEnv(env) + + return p.bootCluster(env, opt) }, } diff --git a/components/playground/monitor.go b/components/playground/monitor.go index ec3269636e..4abd6cd07b 100644 --- a/components/playground/monitor.go +++ b/components/playground/monitor.go @@ -23,8 +23,8 @@ import ( "path/filepath" "github.com/pingcap/errors" - "github.com/pingcap/tiup/components/playground/instance" - "github.com/pingcap/tiup/pkg/localdata" + "github.com/pingcap/tiup/pkg/environment" + tiupexec "github.com/pingcap/tiup/pkg/exec" "github.com/pingcap/tiup/pkg/repository/v0manifest" "github.com/pingcap/tiup/pkg/utils" ) @@ -121,18 +121,17 @@ scrape_configs: } args := []string{ - "tiup", - instance.CompVersion("prometheus", v0manifest.Version(version)), fmt.Sprintf("--config.file=%s", filepath.Join(dir, "prometheus.yml")), fmt.Sprintf("--web.external-url=http://%s", addr), fmt.Sprintf("--web.listen-address=%s:%d", host, port), fmt.Sprintf("--storage.tsdb.path='%s'", filepath.Join(dir, "data")), } - cmd := exec.CommandContext(ctx, args[0], args[1:]...) - cmd.Env = append( - os.Environ(), - fmt.Sprintf("%s=%s", localdata.EnvNameInstanceDataDir, dir), - ) + + env := environment.GlobalEnv() + cmd, err := tiupexec.PrepareCommand(ctx, "prometheus", v0manifest.Version(version), "", "", dir, args, env) + if err != nil { + return 0, nil, err + } m.port = port m.cmd = cmd diff --git a/components/playground/playground.go b/components/playground/playground.go index 6256f66c7e..a671f8838e 100755 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -27,6 +27,7 @@ import ( "runtime" "strconv" "strings" + "sync/atomic" "syscall" "text/tabwriter" "time" @@ -39,17 +40,21 @@ import ( "github.com/pingcap/tiup/components/playground/utils" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/clusterutil" + "github.com/pingcap/tiup/pkg/environment" "github.com/pingcap/tiup/pkg/localdata" "github.com/pingcap/tiup/pkg/repository/v0manifest" "golang.org/x/mod/semver" "golang.org/x/sync/errgroup" ) +// The deadline a process should quit after receives kill signal +const killDeadline = time.Second * 16 + // Playground represent the playground of a cluster. type Playground struct { booted bool + curSig int32 bootOptions *bootOptions - profile *localdata.Profile port int pds []*instance.PDInstance @@ -65,11 +70,17 @@ type Playground struct { monitor *monitor } +// MonitorInfo represent the monitor +type MonitorInfo struct { + IP string `json:"ip"` + Port int `json:"port"` + BinaryPath string `json:"binary_path"` +} + // NewPlayground create a Playground instance. func NewPlayground(port int) *Playground { return &Playground{ port: port, - profile: localdata.InitProfile(), idAlloc: make(map[string]int), } } @@ -388,8 +399,8 @@ func (p *Playground) startInstance(ctx context.Context, inst instance.Instance) func (p *Playground) addWaitInstance(inst instance.Instance) { p.instanceWaiter.Go(func() error { - err := inst.Wait() - if err != nil { + err := inst.Wait(context.TODO()) + if err != nil && atomic.LoadInt32(&p.curSig) == 0 { fmt.Print(color.RedString("%s quit: %s\n", inst.Component(), err.Error())) if lines, _ := utils.TailN(inst.LogFile(), 10); len(lines) > 0 { for _, line := range lines { @@ -478,8 +489,8 @@ func (p *Playground) RWalkInstances(fn func(componentID string, ins instance.Ins return nil }) - for i := 0; i < len(ids); i++ { - err := fn(ids[i], instances[i]) + for i := len(ids); i > 0; i-- { + err := fn(ids[i-1], instances[i-1]) if err != nil { return errors.AddStack(err) } @@ -545,11 +556,6 @@ func (p *Playground) addInstance(componentID string, cfg instance.Config) (ins i cfg.ConfigPath = getAbsolutePath(cfg.ConfigPath) } - err = installIfMissing(p.profile, componentID, p.bootOptions.version) - if err != nil { - return nil, errors.Annotatef(err, "failed to install %s", componentID) - } - dataDir := os.Getenv(localdata.EnvNameInstanceDataDir) if dataDir == "" { return nil, fmt.Errorf("cannot read environment variable %s", localdata.EnvNameInstanceDataDir) @@ -603,7 +609,7 @@ func (p *Playground) addInstance(componentID string, cfg instance.Config) (ins i return } -func (p *Playground) bootCluster(options *bootOptions) error { +func (p *Playground) bootCluster(env *environment.Environment, options *bootOptions) error { p.bootOptions = options if options.pd.Num < 1 || options.tidb.Num < 1 || options.tikv.Num < 1 { @@ -611,7 +617,20 @@ func (p *Playground) bootCluster(options *bootOptions) error { options.pd.Num, options.tikv.Num, options.pd.Num) } - if options.version != "nightly" && options.version != "" { + if options.version == "" { + version, _, err := env.V1Repository().LatestStableVersion("tidb") + if err != nil { + return err + } + options.version = version.String() + + fmt.Println(color.YellowString(`Use latest stable version: %s + + You can specified the version by running "tiup playground ", example: "tiup playground v4.0.0" +`, options.version)) + } + + if options.version != "nightly" { if semver.Compare("v3.1.0", options.version) > 0 && options.tiflash.Num != 0 { fmt.Println(color.YellowString("Warning: current version %s doesn't support TiFlash", options.version)) options.tiflash.Num = 0 @@ -664,103 +683,12 @@ func (p *Playground) bootCluster(options *bootOptions) error { fmt.Println("Playground Bootstrapping...") - monitorInfo := struct { - IP string `json:"ip"` - Port int `json:"port"` - BinaryPath string `json:"binary_path"` - }{} - var monitorCmd *exec.Cmd var grafana *grafana + var monitorInfo *MonitorInfo if options.monitor { - // set up prometheus - if err := installIfMissing(p.profile, "prometheus", options.version); err != nil { - return err - } - - dataDir := os.Getenv(localdata.EnvNameInstanceDataDir) - promDir := filepath.Join(dataDir, "prometheus") - - monitor := newMonitor() - port, cmd, err := monitor.startMonitor(ctx, options.version, options.host, promDir) - if err != nil { - return err - } - p.monitor = monitor - - monitorInfo.IP = options.host - monitorInfo.BinaryPath = promDir - monitorInfo.Port = port - - monitorCmd = cmd - go func() { - log, err := os.OpenFile(filepath.Join(promDir, "prom.log"), os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.ModePerm) - if err != nil { - fmt.Println("Monitor system start failed", err) - return - } - defer log.Close() - - cmd.Stderr = log - cmd.Stdout = os.Stdout - - // fmt.Println("Start Prometheus instance...") - if err := cmd.Start(); err != nil { - fmt.Println("Monitor system start failed", err) - return - } - }() - - // set up grafana - if err := installIfMissing(p.profile, "grafana", options.version); err != nil { - return err - } - installPath, err := p.profile.ComponentInstalledPath("grafana", v0manifest.Version(options.version)) - if err != nil { - return errors.AddStack(err) - } - - dataDir = os.Getenv(localdata.EnvNameInstanceDataDir) - grafanaDir := filepath.Join(dataDir, "grafana") - - cmd = exec.Command("cp", "-r", installPath, grafanaDir) - err = cmd.Run() - if err != nil { - return errors.AddStack(err) - } - - dashboardDir := filepath.Join(grafanaDir, "dashboards") - err = os.MkdirAll(dashboardDir, 0755) - if err != nil { - return errors.AddStack(err) - } - - // mv {grafanaDir}/*.json {grafanaDir}/dashboards/ - err = filepath.Walk(grafanaDir, func(path string, info os.FileInfo, err error) error { - // skip scan sub directory - if info.IsDir() && path != grafanaDir { - return filepath.SkipDir - } - - if strings.HasSuffix(info.Name(), ".json") { - return os.Rename(path, filepath.Join(grafanaDir, "dashboards", info.Name())) - } - - return nil - }) - if err != nil { - return errors.AddStack(err) - } - - err = replaceDatasource(dashboardDir, clusterName) - if err != nil { - return errors.AddStack(err) - } - - grafana = newGrafana(options.version, options.host) - // fmt.Println("Start Grafana instance...") - err = grafana.start(ctx, grafanaDir, fmt.Sprintf("http://%s:%d", monitorInfo.IP, monitorInfo.Port)) - if err != nil { + var err error + if monitorCmd, monitorInfo, grafana, err = p.bootMonitor(ctx, env); err != nil { return errors.AddStack(err) } } @@ -864,7 +792,7 @@ func (p *Playground) bootCluster(options *bootOptions) error { fmt.Println(color.GreenString("To view the dashboard: http://%s/dashboard", pdAddr)) } - if monitorInfo.IP != "" && len(p.pds) != 0 { + if monitorInfo != nil && len(p.pds) != 0 { client, err := newEtcdClient(p.pds[0].Addr()) if err == nil && client != nil { promBinary, err := json.Marshal(monitorInfo) @@ -884,23 +812,16 @@ func (p *Playground) bootCluster(options *bootOptions) error { time.Sleep(20 * time.Second) fmt.Println("Early terminated via failpoint") - _ = p.WalkInstances(func(_ string, inst instance.Instance) error { - _ = syscall.Kill(inst.Pid(), syscall.SIGKILL) - return nil - }) - if monitorCmd != nil { - _ = syscall.Kill(monitorCmd.Process.Pid, syscall.SIGKILL) + extraCmds := []*exec.Cmd{} + if grafana != nil { + extraCmds = append(extraCmds, grafana.cmd) } - - fmt.Println("Wait all processes terminated") - _ = p.WalkInstances(func(_ string, inst instance.Instance) error { - _ = inst.Wait() - return nil - }) if monitorCmd != nil { - _ = monitorCmd.Wait() + extraCmds = append(extraCmds, monitorCmd) } + p.terminate(syscall.SIGKILL, extraCmds...) + return nil }) @@ -912,18 +833,21 @@ func (p *Playground) bootCluster(options *bootOptions) error { syscall.SIGTERM, syscall.SIGQUIT) sig := (<-sc).(syscall.Signal) - if sig != syscall.SIGINT { - // Need more graceful kill by stop components one by one? - _ = p.RWalkInstances(func(_ string, inst instance.Instance) error { - _ = syscall.Kill(inst.Pid(), sig) - return nil - }) - if monitorCmd != nil { - _ = syscall.Kill(monitorCmd.Process.Pid, sig) - } - if grafana != nil { - _ = syscall.Kill(grafana.cmd.Process.Pid, sig) - } + atomic.StoreInt32(&p.curSig, int32(sig)) + extraCmds := []*exec.Cmd{} + if grafana != nil { + extraCmds = append(extraCmds, grafana.cmd) + } + if monitorCmd != nil { + extraCmds = append(extraCmds, monitorCmd) + } + go p.terminate(sig, extraCmds...) + + // If user try double ctrl+c, force quit + sig = (<-sc).(syscall.Signal) + atomic.StoreInt32(&p.curSig, int32(syscall.SIGKILL)) + if sig == syscall.SIGINT { + p.terminate(syscall.SIGKILL, extraCmds...) } }() @@ -940,7 +864,7 @@ func (p *Playground) bootCluster(options *bootOptions) error { if grafana != nil { p.instanceWaiter.Go(func() error { err := grafana.cmd.Wait() - if err != nil { + if err != nil && atomic.LoadInt32(&p.curSig) == 0 { fmt.Printf("Grafana quit: %v\n", err) } return err @@ -950,12 +874,12 @@ func (p *Playground) bootCluster(options *bootOptions) error { // Wait all instance quit and return the first non-nil err. err = p.instanceWaiter.Wait() - if err != nil { + if err != nil && atomic.LoadInt32(&p.curSig) == 0 { return err } if monitorCmd != nil { - if err := monitorCmd.Wait(); err != nil { + if err := monitorCmd.Wait(); err != nil && atomic.LoadInt32(&p.curSig) == 0 { fmt.Println("Monitor system wait failed", err) } } @@ -963,6 +887,35 @@ func (p *Playground) bootCluster(options *bootOptions) error { return nil } +func (p *Playground) terminate(sig syscall.Signal, extraCmds ...*exec.Cmd) { + _ = p.RWalkInstances(func(_ string, inst instance.Instance) error { + if sig != syscall.SIGINT { + _ = syscall.Kill(inst.Pid(), sig) + } + if sig == syscall.SIGKILL { + fmt.Printf("Force %s(%d) to quit...\n", inst.Component(), inst.Pid()) + } else if atomic.LoadInt32(&p.curSig) == int32(sig) { // In case of double ctr+c + fmt.Printf("Wait %s(%d) to quit...\n", inst.Component(), inst.Pid()) + } + ctx, cancel := context.WithTimeout(context.Background(), killDeadline) + defer cancel() + if err := inst.Wait(ctx); err == instance.ErrorWaitTimeout { + _ = syscall.Kill(inst.Pid(), syscall.SIGKILL) + } + return nil + }) + for _, cmd := range extraCmds { + if sig != syscall.SIGINT { + _ = syscall.Kill(cmd.Process.Pid, sig) + } + ctx, cancel := context.WithTimeout(context.Background(), killDeadline) + defer cancel() + if err := instance.WaitContext(ctx, cmd); err == instance.ErrorWaitTimeout { + _ = syscall.Kill(cmd.Process.Pid, syscall.SIGKILL) + } + } +} + func (p *Playground) renderSDFile() error { // we not start monitor at all. if p.monitor == nil { @@ -986,6 +939,99 @@ func (p *Playground) renderSDFile() error { return nil } +func (p *Playground) bootMonitor(ctx context.Context, env *environment.Environment) (*exec.Cmd, *MonitorInfo, *grafana, error) { + options := p.bootOptions + monitorInfo := &MonitorInfo{} + + dataDir := os.Getenv(localdata.EnvNameInstanceDataDir) + promDir := filepath.Join(dataDir, "prometheus") + + monitor := newMonitor() + port, cmd, err := monitor.startMonitor(ctx, options.version, options.host, promDir) + if err != nil { + return nil, nil, nil, err + } + p.monitor = monitor + + monitorInfo.IP = options.host + monitorInfo.BinaryPath = promDir + monitorInfo.Port = port + + monitorCmd := cmd + go func() { + log, err := os.OpenFile(filepath.Join(promDir, "prom.log"), os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.ModePerm) + if err != nil { + fmt.Println("Monitor system start failed", err) + return + } + defer log.Close() + + cmd.Stderr = log + cmd.Stdout = os.Stdout + + // fmt.Println("Start Prometheus instance...") + if err := cmd.Start(); err != nil { + fmt.Println("Monitor system start failed", err) + return + } + }() + + // set up grafana + if err := installIfMissing(env.Profile(), "grafana", options.version); err != nil { + return nil, nil, nil, errors.AddStack(err) + } + installPath, err := env.Profile().ComponentInstalledPath("grafana", v0manifest.Version(options.version)) + if err != nil { + return nil, nil, nil, errors.AddStack(err) + } + + dataDir = os.Getenv(localdata.EnvNameInstanceDataDir) + grafanaDir := filepath.Join(dataDir, "grafana") + + cmd = exec.Command("cp", "-r", installPath, grafanaDir) + err = cmd.Run() + if err != nil { + return nil, nil, nil, errors.AddStack(err) + } + + dashboardDir := filepath.Join(grafanaDir, "dashboards") + err = os.MkdirAll(dashboardDir, 0755) + if err != nil { + return nil, nil, nil, errors.AddStack(err) + } + + // mv {grafanaDir}/*.json {grafanaDir}/dashboards/ + err = filepath.Walk(grafanaDir, func(path string, info os.FileInfo, err error) error { + // skip scan sub directory + if info.IsDir() && path != grafanaDir { + return filepath.SkipDir + } + + if strings.HasSuffix(info.Name(), ".json") { + return os.Rename(path, filepath.Join(grafanaDir, "dashboards", info.Name())) + } + + return nil + }) + if err != nil { + return nil, nil, nil, errors.AddStack(err) + } + + err = replaceDatasource(dashboardDir, clusterName) + if err != nil { + return nil, nil, nil, errors.AddStack(err) + } + + grafana := newGrafana(options.version, options.host) + // fmt.Println("Start Grafana instance...") + err = grafana.start(ctx, grafanaDir, fmt.Sprintf("http://%s:%d", monitorInfo.IP, monitorInfo.Port)) + if err != nil { + return nil, nil, nil, errors.AddStack(err) + } + + return monitorCmd, monitorInfo, grafana, nil +} + func logIfErr(err error) { if err != nil { fmt.Println(err) diff --git a/cmd/run.go b/pkg/exec/run.go similarity index 90% rename from cmd/run.go rename to pkg/exec/run.go index bc22769b96..eaadaffe49 100644 --- a/cmd/run.go +++ b/pkg/exec/run.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cmd +package exec import ( "context" @@ -31,10 +31,12 @@ import ( "github.com/pingcap/tiup/pkg/environment" "github.com/pingcap/tiup/pkg/localdata" "github.com/pingcap/tiup/pkg/repository/v0manifest" + "github.com/pingcap/tiup/pkg/telemetry" "golang.org/x/mod/semver" ) -func runComponent(env *environment.Environment, tag, spec, binPath string, args []string) error { +// RunComponent start a component and wait it +func RunComponent(env *environment.Environment, tag, spec, binPath string, args []string) error { component, version := environment.ParseCompVersion(spec) if !env.IsSupportedComponent(component) { return fmt.Errorf("component `%s` does not support `%s/%s` (see `tiup list`)", component, runtime.GOOS, runtime.GOARCH) @@ -127,7 +129,15 @@ func base62Tag() string { return string(b) } -func launchComponent(ctx context.Context, component string, version v0manifest.Version, binPath string, tag string, args []string, env *environment.Environment) (*localdata.Process, error) { +// PrepareCommand will download necessary component and returns a *exec.Cmd +func PrepareCommand( + ctx context.Context, + component string, + version v0manifest.Version, + binPath, tag, wd string, + args []string, + env *environment.Environment, +) (*exec.Cmd, error) { selectVer, err := env.DownloadComponentIfMissing(component, version) if err != nil { return nil, err @@ -157,7 +167,6 @@ func launchComponent(ctx context.Context, component string, version v0manifest.V } } - wd := os.Getenv(localdata.EnvNameInstanceDataDir) if wd == "" { // Generate a tag for current instance if the tag doesn't specified if tag == "" { @@ -179,7 +188,7 @@ func launchComponent(ctx context.Context, component string, version v0manifest.V return nil, err } - teleMeta, _, err := getTelemetryMeta(env) + teleMeta, _, err := telemetry.GetMeta(env) if err != nil { return nil, err } @@ -206,13 +215,23 @@ func launchComponent(ctx context.Context, component string, version v0manifest.V c.Stderr = os.Stderr c.Dir = wd + return c, nil +} + +func launchComponent(ctx context.Context, component string, version v0manifest.Version, binPath string, tag string, args []string, env *environment.Environment) (*localdata.Process, error) { + wd := os.Getenv(localdata.EnvNameInstanceDataDir) + c, err := PrepareCommand(ctx, component, version, binPath, tag, wd, args, env) + if err != nil { + return nil, err + } + p := &localdata.Process{ Component: component, CreatedTime: time.Now().Format(time.RFC3339), Exec: binPath, Args: args, - Dir: wd, - Env: envs, + Dir: c.Dir, + Env: c.Env, Cmd: c, } diff --git a/pkg/repository/v1_repository.go b/pkg/repository/v1_repository.go index 3928a03e2b..3680df7924 100644 --- a/pkg/repository/v1_repository.go +++ b/pkg/repository/v1_repository.go @@ -676,6 +676,39 @@ func (r *V1Repository) ComponentVersion(id, version string) (*v1manifest.Version return vi, nil } +// LatestStableVersion returns the latest stable version of specific component +func (r *V1Repository) LatestStableVersion(id string) (v0manifest.Version, *v1manifest.VersionItem, error) { + com, err := r.FetchComponentManifest(id) + if err != nil { + return "", nil, err + } + + versions := com.Platforms[r.PlatformString()] + if versions == nil { + versions = com.Platforms[v1manifest.AnyPlatform] + if versions == nil { + return "", nil, fmt.Errorf("component %s doesn't support platform %s", id, r.PlatformString()) + } + } + + var last string + for v := range versions { + if v0manifest.Version(v).IsNightly() { + continue + } + + if last == "" || semver.Compare(last, v) < 0 { + last = v + } + } + + if last == "" { + return "", nil, fmt.Errorf("component %s doesn't has a stable version", id) + } + + return v0manifest.Version(last), com.VersionItem(r.PlatformString(), last), nil +} + // BinaryPath return the binary path of the component. // Support you have install the component, need to get entry from local manifest. // Load the manifest locally only to get then Entry, do not force do something need access mirror. diff --git a/pkg/telemetry/meta.go b/pkg/telemetry/meta.go index d56d01f271..d0bf597d40 100644 --- a/pkg/telemetry/meta.go +++ b/pkg/telemetry/meta.go @@ -1,14 +1,32 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package telemetry import ( "io/ioutil" "os" + "path/filepath" "github.com/google/uuid" "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/environment" + "github.com/pingcap/tiup/pkg/localdata" "gopkg.in/yaml.v2" ) +const telemetryFname = "meta.yaml" + // Status of telemetry. type Status string @@ -68,3 +86,16 @@ func (m *Meta) SaveTo(fname string) error { return ioutil.WriteFile(fname, data, 0644) } + +// GetMeta read the telemeta from disk +func GetMeta(env *environment.Environment) (meta *Meta, fname string, err error) { + dir := env.Profile().Path(localdata.TelemetryDir) + err = os.MkdirAll(dir, 0755) + if err != nil { + return + } + + fname = filepath.Join(dir, telemetryFname) + meta, err = LoadFrom(fname) + return +}