Skip to content

Commit

Permalink
feat: add dependency installer service and enhance task runner with d…
Browse files Browse the repository at this point in the history
…ependency management

- Introduced a new DependencyInstallerService interface to define methods for managing dependency installation commands.
- Implemented registry service for managing the DependencyInstallerService instance.
- Enhanced the task runner to install dependencies if available, including command execution and logging for stdout and stderr.
- Improved error handling and logging throughout the task runner's dependency installation process.
- Updated the runner's methods to utilize the new dependency management features, ensuring better integration and functionality.
  • Loading branch information
tikazyq committed Jan 1, 2025
1 parent 136daff commit b056105
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 42 deletions.
15 changes: 15 additions & 0 deletions core/dependency/registry_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dependency

import (
"github.com/crawlab-team/crawlab/core/interfaces"
)

var serviceInstance interfaces.DependencyInstallerService

func SetDependencyInstallerRegistryService(svc interfaces.DependencyInstallerService) {
serviceInstance = svc
}

func GetDependencyInstallerRegistryService() interfaces.DependencyInstallerService {
return serviceInstance
}
10 changes: 10 additions & 0 deletions core/interfaces/dependency_installer_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package interfaces

import (
"go.mongodb.org/mongo-driver/bson/primitive"
"os/exec"
)

type DependencyInstallerService interface {
GetInstallDependencyRequirementsCmdBySpiderId(id primitive.ObjectID) (cmd *exec.Cmd, err error)
}
185 changes: 143 additions & 42 deletions core/task/handler/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/crawlab-team/crawlab/core/dependency"
"github.com/crawlab-team/crawlab/core/fs"
"github.com/hashicorp/go-multierror"

Expand All @@ -32,6 +33,57 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
)

// newTaskRunner creates a new task runner instance with the specified task ID
// It initializes all necessary components and establishes required connections
func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) {
// validate options
if id.IsZero() {
err = fmt.Errorf("invalid task id: %s", id.Hex())
return nil, err
}

// runner
r = &Runner{
subscribeTimeout: 30 * time.Second,
bufferSize: 1024 * 1024,
svc: svc,
tid: id,
ch: make(chan constants.TaskSignal),
logBatchSize: 20,
Logger: utils.NewLogger("TaskRunner"),
}

// multi error
var errs multierror.Error

// task
r.t, err = svc.GetTaskById(id)
if err != nil {
errs.Errors = append(errs.Errors, err)
} else {
// spider
r.s, err = svc.GetSpiderById(r.t.SpiderId)
if err != nil {
errs.Errors = append(errs.Errors, err)
} else {
// task fs service
r.fsSvc = fs.NewFsService(filepath.Join(utils.GetWorkspace(), r.s.Id.Hex()))
}
}

// Initialize context and done channel
r.ctx, r.cancel = context.WithCancel(context.Background())
r.done = make(chan struct{})

// initialize task runner
if err := r.Init(); err != nil {
r.Errorf("error initializing task runner: %v", err)
errs.Errors = append(errs.Errors, err)
}

return r, errs.ErrorOrNil()
}

// Runner represents a task execution handler that manages the lifecycle of a running task
type Runner struct {
// dependencies
Expand Down Expand Up @@ -106,6 +158,11 @@ func (r *Runner) Run() (err error) {
}
}

// install dependencies
if err := r.installDependenciesIfAvailable(); err != nil {
r.Warnf("error installing dependencies: %v", err)
}

// configure cmd
err = r.configureCmd()
if err != nil {
Expand Down Expand Up @@ -289,7 +346,7 @@ func (r *Runner) configureNodePath() {
home, err := os.UserHomeDir()
if err != nil {
r.Errorf("error getting user home directory: %v", err)
home = "/root" // fallback to root if can't get home dir
home = "/root" // fallback to root if it can't get home dir
}

// Configure nvm-based Node.js paths
Expand Down Expand Up @@ -630,7 +687,7 @@ func (r *Runner) updateTask(status string, e error) (err error) {
if e != nil {
r.t.Error = e.Error()
}
if r.svc.GetNodeConfigService().IsMaster() {
if utils.IsMaster() {
err = service.NewModelService[models.Task]().ReplaceById(r.t.Id, *r.t)
if err != nil {
return err
Expand Down Expand Up @@ -720,7 +777,7 @@ func (r *Runner) _updateTaskStat(status string) {
ts.RuntimeDuration = ts.EndTs.Sub(ts.StartTs).Milliseconds()
ts.TotalDuration = ts.EndTs.Sub(ts.CreatedAt).Milliseconds()
}
if r.svc.GetNodeConfigService().IsMaster() {
if utils.IsMaster() {
err = service.NewModelService[models.TaskStat]().ReplaceById(ts.Id, *ts)
if err != nil {
r.Errorf("error updating task stat: %v", err)
Expand Down Expand Up @@ -790,7 +847,7 @@ func (r *Runner) _updateSpiderStat(status string) {
}

// perform update
if r.svc.GetNodeConfigService().IsMaster() {
if utils.IsMaster() {
err = service.NewModelService[models.SpiderStat]().UpdateById(r.s.Id, update)
if err != nil {
r.Errorf("error updating spider stat: %v", err)
Expand Down Expand Up @@ -982,55 +1039,79 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
}
}

// newTaskRunner creates a new task runner instance with the specified task ID
// It initializes all necessary components and establishes required connections
func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) {
// validate options
if id.IsZero() {
err = fmt.Errorf("invalid task id: %s", id.Hex())
return nil, err
func (r *Runner) installDependenciesIfAvailable() (err error) {
if !utils.IsPro() {
return nil
}

// runner
r = &Runner{
subscribeTimeout: 30 * time.Second,
bufferSize: 1024 * 1024,
svc: svc,
tid: id,
ch: make(chan constants.TaskSignal),
logBatchSize: 20,
Logger: utils.NewLogger("TaskRunner"),
depSvc := dependency.GetDependencyInstallerRegistryService()
if depSvc == nil {
r.Warnf("dependency installer service not available")
return nil
}

// multi error
var errs multierror.Error
cmd, err := depSvc.GetInstallDependencyRequirementsCmdBySpiderId(r.s.Id)
if err != nil {
return err
}
if cmd == nil {
return nil
}

// task
r.t, err = svc.GetTaskById(id)
// Set up pipes for stdout and stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
errs.Errors = append(errs.Errors, err)
} else {
// spider
r.s, err = svc.GetSpiderById(r.t.SpiderId)
if err != nil {
errs.Errors = append(errs.Errors, err)
} else {
// task fs service
r.fsSvc = fs.NewFsService(filepath.Join(utils.GetWorkspace(), r.s.Id.Hex()))
}
r.Errorf("error creating stdout pipe for dependency installation: %v", err)
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
r.Errorf("error creating stderr pipe for dependency installation: %v", err)
return err
}

// Initialize context and done channel
r.ctx, r.cancel = context.WithCancel(context.Background())
r.done = make(chan struct{})
// Start the command
r.Infof("installing dependencies for spider: %s", r.s.Id.Hex())
r.Infof("command for dependencies installation: %s", cmd.String())
if err := cmd.Start(); err != nil {
r.Errorf("error starting dependency installation command: %v", err)
return err
}

// initialize task runner
if err := r.Init(); err != nil {
r.Errorf("error initializing task runner: %v", err)
errs.Errors = append(errs.Errors, err)
// Create wait group for log readers
var wg sync.WaitGroup
wg.Add(2)

// Read stdout
go func() {
defer wg.Done()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
r.Info(line)
}
}()

// Read stderr
go func() {
defer wg.Done()
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
line := scanner.Text()
r.Error(line)
}
}()

// Wait for command to complete
if err := cmd.Wait(); err != nil {
r.Errorf("dependency installation failed: %v", err)
return err
}

return r, errs.ErrorOrNil()
// Wait for log readers to finish
wg.Wait()

return nil
}

// logInternally sends internal runner logs to the same logging system as the task
Expand Down Expand Up @@ -1062,6 +1143,26 @@ func (r *Runner) logInternally(level string, message string) {
}
}

func (r *Runner) Error(message string) {
msg := fmt.Sprintf(message)
r.logInternally("ERROR", msg)
}

func (r *Runner) Warn(message string) {
msg := fmt.Sprintf(message)
r.logInternally("WARN", msg)
}

func (r *Runner) Info(message string) {
msg := fmt.Sprintf(message)
r.logInternally("INFO", msg)
}

func (r *Runner) Debug(message string) {
msg := fmt.Sprintf(message)
r.logInternally("DEBUG", msg)
}

func (r *Runner) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
r.logInternally("ERROR", msg)
Expand Down

0 comments on commit b056105

Please sign in to comment.