From b056105246a3d520bdfece5f80a1aabfeccefee3 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 1 Jan 2025 20:51:55 +0800 Subject: [PATCH] feat: add dependency installer service and enhance task runner with dependency management - Introduced a new DependencyInstallerService interface to define methods for managing dependency installation commands. - Implemented registry service for managing the DependencyInstallerService instance. - Enhanced the task runner to install dependencies if available, including command execution and logging for stdout and stderr. - Improved error handling and logging throughout the task runner's dependency installation process. - Updated the runner's methods to utilize the new dependency management features, ensuring better integration and functionality. --- core/dependency/registry_service.go | 15 ++ .../dependency_installer_service.go | 10 + core/task/handler/runner.go | 185 ++++++++++++++---- 3 files changed, 168 insertions(+), 42 deletions(-) create mode 100644 core/dependency/registry_service.go create mode 100644 core/interfaces/dependency_installer_service.go diff --git a/core/dependency/registry_service.go b/core/dependency/registry_service.go new file mode 100644 index 00000000..e78acd66 --- /dev/null +++ b/core/dependency/registry_service.go @@ -0,0 +1,15 @@ +package dependency + +import ( + "github.com/crawlab-team/crawlab/core/interfaces" +) + +var serviceInstance interfaces.DependencyInstallerService + +func SetDependencyInstallerRegistryService(svc interfaces.DependencyInstallerService) { + serviceInstance = svc +} + +func GetDependencyInstallerRegistryService() interfaces.DependencyInstallerService { + return serviceInstance +} diff --git a/core/interfaces/dependency_installer_service.go b/core/interfaces/dependency_installer_service.go new file mode 100644 index 00000000..4e866046 --- /dev/null +++ b/core/interfaces/dependency_installer_service.go @@ -0,0 +1,10 @@ +package interfaces + +import ( + "go.mongodb.org/mongo-driver/bson/primitive" + "os/exec" +) + +type DependencyInstallerService interface { + GetInstallDependencyRequirementsCmdBySpiderId(id primitive.ObjectID) (cmd *exec.Cmd, err error) +} diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 0e374670..bcb64933 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/crawlab-team/crawlab/core/dependency" "github.com/crawlab-team/crawlab/core/fs" "github.com/hashicorp/go-multierror" @@ -32,6 +33,57 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) +// newTaskRunner creates a new task runner instance with the specified task ID +// It initializes all necessary components and establishes required connections +func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) { + // validate options + if id.IsZero() { + err = fmt.Errorf("invalid task id: %s", id.Hex()) + return nil, err + } + + // runner + r = &Runner{ + subscribeTimeout: 30 * time.Second, + bufferSize: 1024 * 1024, + svc: svc, + tid: id, + ch: make(chan constants.TaskSignal), + logBatchSize: 20, + Logger: utils.NewLogger("TaskRunner"), + } + + // multi error + var errs multierror.Error + + // task + r.t, err = svc.GetTaskById(id) + if err != nil { + errs.Errors = append(errs.Errors, err) + } else { + // spider + r.s, err = svc.GetSpiderById(r.t.SpiderId) + if err != nil { + errs.Errors = append(errs.Errors, err) + } else { + // task fs service + r.fsSvc = fs.NewFsService(filepath.Join(utils.GetWorkspace(), r.s.Id.Hex())) + } + } + + // Initialize context and done channel + r.ctx, r.cancel = context.WithCancel(context.Background()) + r.done = make(chan struct{}) + + // initialize task runner + if err := r.Init(); err != nil { + r.Errorf("error initializing task runner: %v", err) + errs.Errors = append(errs.Errors, err) + } + + return r, errs.ErrorOrNil() +} + // Runner represents a task execution handler that manages the lifecycle of a running task type Runner struct { // dependencies @@ -106,6 +158,11 @@ func (r *Runner) Run() (err error) { } } + // install dependencies + if err := r.installDependenciesIfAvailable(); err != nil { + r.Warnf("error installing dependencies: %v", err) + } + // configure cmd err = r.configureCmd() if err != nil { @@ -289,7 +346,7 @@ func (r *Runner) configureNodePath() { home, err := os.UserHomeDir() if err != nil { r.Errorf("error getting user home directory: %v", err) - home = "/root" // fallback to root if can't get home dir + home = "/root" // fallback to root if it can't get home dir } // Configure nvm-based Node.js paths @@ -630,7 +687,7 @@ func (r *Runner) updateTask(status string, e error) (err error) { if e != nil { r.t.Error = e.Error() } - if r.svc.GetNodeConfigService().IsMaster() { + if utils.IsMaster() { err = service.NewModelService[models.Task]().ReplaceById(r.t.Id, *r.t) if err != nil { return err @@ -720,7 +777,7 @@ func (r *Runner) _updateTaskStat(status string) { ts.RuntimeDuration = ts.EndTs.Sub(ts.StartTs).Milliseconds() ts.TotalDuration = ts.EndTs.Sub(ts.CreatedAt).Milliseconds() } - if r.svc.GetNodeConfigService().IsMaster() { + if utils.IsMaster() { err = service.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts) if err != nil { r.Errorf("error updating task stat: %v", err) @@ -790,7 +847,7 @@ func (r *Runner) _updateSpiderStat(status string) { } // perform update - if r.svc.GetNodeConfigService().IsMaster() { + if utils.IsMaster() { err = service.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update) if err != nil { r.Errorf("error updating spider stat: %v", err) @@ -982,55 +1039,79 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { } } -// newTaskRunner creates a new task runner instance with the specified task ID -// It initializes all necessary components and establishes required connections -func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) { - // validate options - if id.IsZero() { - err = fmt.Errorf("invalid task id: %s", id.Hex()) - return nil, err +func (r *Runner) installDependenciesIfAvailable() (err error) { + if !utils.IsPro() { + return nil } - // runner - r = &Runner{ - subscribeTimeout: 30 * time.Second, - bufferSize: 1024 * 1024, - svc: svc, - tid: id, - ch: make(chan constants.TaskSignal), - logBatchSize: 20, - Logger: utils.NewLogger("TaskRunner"), + depSvc := dependency.GetDependencyInstallerRegistryService() + if depSvc == nil { + r.Warnf("dependency installer service not available") + return nil } - // multi error - var errs multierror.Error + cmd, err := depSvc.GetInstallDependencyRequirementsCmdBySpiderId(r.s.Id) + if err != nil { + return err + } + if cmd == nil { + return nil + } - // task - r.t, err = svc.GetTaskById(id) + // Set up pipes for stdout and stderr + stdout, err := cmd.StdoutPipe() if err != nil { - errs.Errors = append(errs.Errors, err) - } else { - // spider - r.s, err = svc.GetSpiderById(r.t.SpiderId) - if err != nil { - errs.Errors = append(errs.Errors, err) - } else { - // task fs service - r.fsSvc = fs.NewFsService(filepath.Join(utils.GetWorkspace(), r.s.Id.Hex())) - } + r.Errorf("error creating stdout pipe for dependency installation: %v", err) + return err + } + stderr, err := cmd.StderrPipe() + if err != nil { + r.Errorf("error creating stderr pipe for dependency installation: %v", err) + return err } - // Initialize context and done channel - r.ctx, r.cancel = context.WithCancel(context.Background()) - r.done = make(chan struct{}) + // Start the command + r.Infof("installing dependencies for spider: %s", r.s.Id.Hex()) + r.Infof("command for dependencies installation: %s", cmd.String()) + if err := cmd.Start(); err != nil { + r.Errorf("error starting dependency installation command: %v", err) + return err + } - // initialize task runner - if err := r.Init(); err != nil { - r.Errorf("error initializing task runner: %v", err) - errs.Errors = append(errs.Errors, err) + // Create wait group for log readers + var wg sync.WaitGroup + wg.Add(2) + + // Read stdout + go func() { + defer wg.Done() + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + line := scanner.Text() + r.Info(line) + } + }() + + // Read stderr + go func() { + defer wg.Done() + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + line := scanner.Text() + r.Error(line) + } + }() + + // Wait for command to complete + if err := cmd.Wait(); err != nil { + r.Errorf("dependency installation failed: %v", err) + return err } - return r, errs.ErrorOrNil() + // Wait for log readers to finish + wg.Wait() + + return nil } // logInternally sends internal runner logs to the same logging system as the task @@ -1062,6 +1143,26 @@ func (r *Runner) logInternally(level string, message string) { } } +func (r *Runner) Error(message string) { + msg := fmt.Sprintf(message) + r.logInternally("ERROR", msg) +} + +func (r *Runner) Warn(message string) { + msg := fmt.Sprintf(message) + r.logInternally("WARN", msg) +} + +func (r *Runner) Info(message string) { + msg := fmt.Sprintf(message) + r.logInternally("INFO", msg) +} + +func (r *Runner) Debug(message string) { + msg := fmt.Sprintf(message) + r.logInternally("DEBUG", msg) +} + func (r *Runner) Errorf(format string, args ...interface{}) { msg := fmt.Sprintf(format, args...) r.logInternally("ERROR", msg)