From 8fca398e2c5b9e646b6a246489b92527aad8d424 Mon Sep 17 00:00:00 2001 From: Konstantin L Date: Thu, 24 Mar 2016 04:28:06 +0700 Subject: [PATCH] Add concurrency models --- index.js | 27 ++++++----- package.json | 6 ++- utils.js | 132 ++++++++++++++++++++++++++++++++------------------- 3 files changed, 100 insertions(+), 65 deletions(-) diff --git a/index.js b/index.js index e638668..27f3f32 100755 --- a/index.js +++ b/index.js @@ -1,7 +1,5 @@ #!/usr/bin/env node -var childProcess = require('child_process'); -var Promise = require('bluebird'); var _ = require('lodash'); var chokidar = require('chokidar'); var utils = require('./utils'); @@ -25,7 +23,8 @@ var defaultOpts = { verbose: false, silent: false, initial: false, - command: null + command: null, + concurrency: 'kill' }; var VERSION = 'chokidar-cli: ' + require('./package.json').version + @@ -51,6 +50,14 @@ var argv = require('yargs') 'command will be replaced by the corresponding values from ' + 'the chokidar event.' }) + .option('concurrency', { + default: defaultOpts.concurrency, + describe: 'Command execution concurrency model.\n' + + '- kill: kills unfinished process before starting a new one.\n' + + '- queue: waits until previously started process is finished before starting a new one.\n' + + '- parallel: executes subsequent commands in parallel.', + choices: ['kill', 'queue', 'parallel'] + }) .option('d', { alias: 'debounce', default: defaultOpts.debounce, @@ -134,13 +141,16 @@ function getUserOpts(argv) { return argv; } -// Estimates spent working hours based on commit dates function startWatching(opts) { var chokidarOpts = createChokidarOpts(opts); var watcher = chokidar.watch(opts.patterns, chokidarOpts); + var runner = utils.runner(opts.concurrency); + var run = runner.run.bind(runner); + var throttledRun = _.throttle(run, opts.throttle); var debouncedRun = _.debounce(throttledRun, opts.debounce); + watcher.on('all', function(event, path) { var description = EVENT_DESCRIPTIONS[event] + ':'; @@ -152,7 +162,6 @@ function startWatching(opts) { } } - // XXX: commands might be still run concurrently if (opts.command) { debouncedRun( opts.command @@ -211,12 +220,4 @@ function _resolveIgnoreOpt(ignoreOpt) { }); } -function run(cmd) { - return utils.run(cmd) - .catch(function(err) { - console.error('Error when executing', cmd); - console.error(err.stack); - }); -} - main(); diff --git a/package.json b/package.json index 7c4ebae..7440696 100644 --- a/package.json +++ b/package.json @@ -31,10 +31,12 @@ "license": "MIT", "dependencies": { "anymatch": "^1.1.0", - "bluebird": "^2.9.24", + "async": "^2.0.0-rc.1", + "bluebird": "^3.3.4", "chokidar": "^1.0.1", + "colors": "^1.1.2", + "exec-sh": "^0.2.0", "lodash": "^3.7.0", - "shell-quote": "^1.4.3", "yargs": "^3.7.2" }, "devDependencies": { diff --git a/utils.js b/utils.js index 1df8ab3..b65dac6 100644 --- a/utils.js +++ b/utils.js @@ -1,63 +1,95 @@ -var childProcess = require('child_process'); -var _ = require('lodash'); var Promise = require('bluebird'); -var shellQuote = require('shell-quote'); - -// Try to resolve path to shell. -// We assume that Windows provides COMSPEC env variable -// and other platforms provide SHELL env variable -var SHELL_PATH = process.env.SHELL || process.env.COMSPEC; -var EXECUTE_OPTION = process.env.COMSPEC !== undefined && process.env.SHELL === undefined ? '/c' : '-c'; - -// XXX: Wrapping tos to a promise is a bit wrong abstraction. Maybe RX suits -// better? -function run(cmd, opts) { - if (!SHELL_PATH) { - // If we cannot resolve shell, better to just crash - throw new Error('$SHELL environment variable is not set.'); - } - - opts = _.merge({ - pipe: true, - cwd: undefined, - callback: function(child) { - // Since we return promise, we need to provide - // this callback if one wants to access the child - // process reference - // Called immediately after successful child process - // spawn - } - }, opts); +var execSh = require('exec-sh'); +var colors = require('colors'); +var async = require('async'); +var _ = require('lodash'); +var colors = require('colors'); - return new Promise(function(resolve, reject) { - var child; +// Allow to cancel bluebird promises +Promise.config({ + cancellation: true +}); - try { - child = childProcess.spawn(SHELL_PATH, [EXECUTE_OPTION, cmd], { - cwd: opts.cwd, - stdio: opts.pipe ? 'inherit' : null - }); - } catch (e) { - return Promise.reject(e); - } +// Execute command as cancellable promise +function exec(task, finish) { + task.promise = new Promise(function(resolve, reject, onCancel) { + var process = execSh(task.cmd, {}, function(err, stdout, stderr) { + // Avoid issues with killing exited process + process = undefined; + + // No need to reject/resolve if promise was cancelled and process was killed + if (task.promise.isCancelled()) { + return; + } + + if (err) { + // Error code !== 0 + console.log(('Error code: ' + err.code + ' for run #' + task.number).red); + reject(err, stdout, stderr); + } + else { + console.log(('Finished run #' + task.number).green); + resolve(); + } + }); + + onCancel(function () { + console.log(('Cancelled run #' + task.number).yellow); + if (process) { + console.log(('Killing run #' + task.number).yellow); + + // XXX: should we send SIGKILL or account for sub-processes somehow + // or is it really as simple as this + process.kill(); + } + }); + }); + + task.promise.finally(function () { + // Tell the queue that we're finished + finish(); + }); +} + +// Create runner based on specified concurrency model +function runner(concurrencyModel) { + // async.queue does not support unlimited concurrent tasks. + // Set sane (?) default - 100 tasks for parallel mode + var concurrency = ('parallel' === concurrencyModel) ? 100 : 1; - opts.callback(child); + // Create worker queue + var queue = async.queue(exec, concurrency); + var taskNumber = 1; - function errorHandler(err) { - child.removeListener('close', closeHandler); - reject(err); + var run = function (cmd) { + // In queue mode: we don't want to queue more than one + // extra task to be run after filesystem stops changing + if ('queue' === concurrencyModel && queue.length()) { + console.log(('Command is already queued, skipping').blue); + + return; } - function closeHandler(exitCode) { - child.removeListener('error', errorHandler); - resolve(exitCode); + // In kill mode: cancel running task before pushing new one + if ('kill' === concurrencyModel && queue.running()) { + _.each(queue.workersList(), function (worker) { + worker.data.promise.cancel(); + }); } - child.once('error', errorHandler); - child.once('close', closeHandler); - }); + console.log(('Adding task #' + taskNumber + ' to the queue').inverse); + queue.push({ + cmd: cmd, + number: taskNumber++ + }); + }; + + // Return new runner with run method + return { + run: run + }; } module.exports = { - run: run + runner: runner };