diff --git a/packages/nexrender-worker/src/index.js b/packages/nexrender-worker/src/index.js index aa63ad65..e88bcfc5 100644 --- a/packages/nexrender-worker/src/index.js +++ b/packages/nexrender-worker/src/index.js @@ -1,193 +1,5 @@ -const { createClient } = require('@nexrender/api') -const { init, render } = require('@nexrender/core') -const { getRenderingStatus } = require('@nexrender/types/job') -const pkg = require('../package.json') +const { createWorker } = require('./instance') -const NEXRENDER_API_POLLING = process.env.NEXRENDER_API_POLLING || 30 * 1000; -const NEXRENDER_TOLERATE_EMPTY_QUEUES = process.env.NEXRENDER_TOLERATE_EMPTY_QUEUES; -var emptyReturns = 0; +const instance = createWorker() -let active = true; - -const delay = amount => ( - new Promise(resolve => setTimeout(resolve, amount)) -) - -const nextJob = async (client, settings) => { - do { - try { - let job = await (settings.tagSelector ? - await client.pickupJob(settings.tagSelector) : - await client.pickupJob() - ); - - if (job && job.uid) { - emptyReturns = 0; - return job - } else { - // no job was returned by the server. If enough checks have passed, and the exit option is set, deactivate the worker - emptyReturns++; - if (settings.exitOnEmptyQueue && emptyReturns > settings.tolerateEmptyQueues) active = false; - } - - } catch (err) { - if (settings.stopOnError) { - throw err; - } else { - console.error(err) - console.error("render proccess stopped with error...") - console.error("continue listening next job...") - } - } - - if (active) await delay(settings.polling || NEXRENDER_API_POLLING) - } while (active) -} - -/** - * Starts worker "thread" of continious loop - * of fetching queued projects and rendering them - * @param {String} host - * @param {String} secret - * @param {Object} settings - * @return {Promise} - */ -const start = async (host, secret, settings, headers) => { - settings = init(Object.assign({ process: 'nexrender-worker' }, settings, { - logger: console, - })) - - settings.logger.log('starting nexrender-worker with following settings:') - Object.keys(settings).forEach(key => { - settings.logger.log(` - ${key}: ${settings[key]}`) - }) - - if (typeof settings.tagSelector == 'string') { - settings.tagSelector = settings.tagSelector.replace(/[^a-z0-9, ]/gi, '') - } - // if there is no setting for how many empty queues to tolerate, make one from the - // environment variable, or the default (which is zero) - if (!(typeof settings.tolerateEmptyQueues == 'number')) { - settings.tolerateEmptyQueues = NEXRENDER_TOLERATE_EMPTY_QUEUES; - } - - headers = headers || {}; - headers['user-agent'] = ('nexrender-worker/' + pkg.version + ' ' + (headers['user-agent'] || '')).trim(); - - const client = createClient({ host, secret, headers, name: settings.name }); - - settings.track('Worker Started', { - worker_tags_set: !!settings.tagSelector, - worker_setting_tolerate_empty_queues: settings.tolerateEmptyQueues, - worker_setting_exit_on_empty_queue: settings.exitOnEmptyQueue, - worker_setting_polling: settings.polling, - worker_setting_stop_on_error: settings.stopOnError, - }) - - do { - let job = await nextJob(client, settings); - - // if the worker has been deactivated, exit this loop - if (!active) break; - - settings.track('Worker Job Started', { - job_id: job.uid, // anonymized internally - }) - - job.state = 'started'; - job.startedAt = new Date() - - try { - await client.updateJob(job.uid, job) - } catch (err) { - console.log(`[${job.uid}] error while updating job state to ${job.state}. Job abandoned.`) - console.log(`[${job.uid}] error stack: ${err.stack}`) - continue; - } - - try { - job.onRenderProgress = (job) => { - try { - /* send render progress to our server */ - client.updateJob(job.uid, getRenderingStatus(job)); - - if (settings.onRenderProgress) { - settings.onRenderProgress(job); - } - } catch (err) { - if (settings.stopOnError) { - throw err; - } else { - console.log(`[${job.uid}] error occurred: ${err.stack}`) - console.log(`[${job.uid}] render proccess stopped with error...`) - console.log(`[${job.uid}] continue listening next job...`) - } - } - } - - job.onRenderError = (job, err /* on render error */) => { - job.error = [].concat(job.error || [], [err.toString()]); - - if (settings.onRenderError) { - settings.onRenderError(job, err); - } - } - - job = await render(job, settings); { - job.state = 'finished'; - job.finishedAt = new Date(); - if (settings.onFinished) { - settings.onFinished(job); - } - } - - settings.track('Worker Job Finished', { job_id: job.uid }) - - await client.updateJob(job.uid, getRenderingStatus(job)) - } catch (err) { - job.error = [].concat(job.error || [], [err.toString()]); - job.errorAt = new Date(); - job.state = 'error'; - - settings.track('Worker Job Error', { job_id: job.uid }); - - if (settings.onError) { - settings.onError(job, err); - } - - try { - await client.updateJob(job.uid, getRenderingStatus(job)) - } - catch (e) { - console.log(`[${job.uid}] error while updating job state to ${job.state}. Job abandoned.`) - console.log(`[${job.uid}] error stack: ${e.stack}`) - } - - if (settings.stopOnError) { - throw err; - } else { - console.log(`[${job.uid}] error occurred: ${err.stack}`) - console.log(`[${job.uid}] render proccess stopped with error...`) - console.log(`[${job.uid}] continue listening next job...`) - } - } - } while (active) -} - -/** - * Stops worker "thread" - * @return {void} - */ -const stop = () => { - active = false; -} - -/** - * Returns the current status of the worker - * @return {Boolean} - */ -const isRunning = () => { - return active; -} - -module.exports = { start, stop, isRunning } +module.exports = instance diff --git a/packages/nexrender-worker/src/instance.js b/packages/nexrender-worker/src/instance.js new file mode 100644 index 00000000..374b868a --- /dev/null +++ b/packages/nexrender-worker/src/instance.js @@ -0,0 +1,209 @@ +const { createClient } = require('@nexrender/api') +const { init, render } = require('@nexrender/core') +const { getRenderingStatus } = require('@nexrender/types/job') +const pkg = require('../package.json') + +const NEXRENDER_API_POLLING = process.env.NEXRENDER_API_POLLING || 30 * 1000; +const NEXRENDER_TOLERATE_EMPTY_QUEUES = process.env.NEXRENDER_TOLERATE_EMPTY_QUEUES; + +const delay = amount => new Promise(resolve => setTimeout(resolve, amount)) + +const createWorker = () => { + let emptyReturns = 0; + let active = false; + let settingsRef = null; + + const nextJob = async (client, settings) => { + do { + try { + let job = await (settings.tagSelector ? + await client.pickupJob(settings.tagSelector) : + await client.pickupJob() + ); + + if (job && job.uid) { + emptyReturns = 0; + return job + } else { + // no job was returned by the server. If enough checks have passed, and the exit option is set, deactivate the worker + emptyReturns++; + if (settings.exitOnEmptyQueue && emptyReturns > settings.tolerateEmptyQueues) active = false; + } + + } catch (err) { + if (settings.stopOnError) { + throw err; + } else { + console.error(err) + console.error("render proccess stopped with error...") + console.error("continue listening next job...") + } + } + + if (active) await delay(settings.polling || NEXRENDER_API_POLLING) + } while (active) + } + + /** + * Starts worker "thread" of continious loop + * of fetching queued projects and rendering them + * @param {String} host + * @param {String} secret + * @param {Object} settings + * @return {Promise} + */ + const start = async (host, secret, settings, headers) => { + settings = init(Object.assign({ process: 'nexrender-worker' }, settings, { + logger: console, + })) + + settingsRef = settings; + active = true; + + settings.logger.log('starting nexrender-worker with following settings:') + Object.keys(settings).forEach(key => { + settings.logger.log(` - ${key}: ${settings[key]}`) + }) + + if (typeof settings.tagSelector == 'string') { + settings.tagSelector = settings.tagSelector.replace(/[^a-z0-9, ]/gi, '') + } + // if there is no setting for how many empty queues to tolerate, make one from the + // environment variable, or the default (which is zero) + if (!(typeof settings.tolerateEmptyQueues == 'number')) { + settings.tolerateEmptyQueues = NEXRENDER_TOLERATE_EMPTY_QUEUES; + } + + headers = headers || {}; + headers['user-agent'] = ('nexrender-worker/' + pkg.version + ' ' + (headers['user-agent'] || '')).trim(); + + const client = createClient({ host, secret, headers, name: settings.name }); + + settings.track('Worker Started', { + worker_tags_set: !!settings.tagSelector, + worker_setting_tolerate_empty_queues: settings.tolerateEmptyQueues, + worker_setting_exit_on_empty_queue: settings.exitOnEmptyQueue, + worker_setting_polling: settings.polling, + worker_setting_stop_on_error: settings.stopOnError, + }) + + do { + let job = await nextJob(client, settings); + + // if the worker has been deactivated, exit this loop + if (!active) break; + + settings.track('Worker Job Started', { + job_id: job.uid, // anonymized internally + }) + + job.state = 'started'; + job.startedAt = new Date() + + try { + await client.updateJob(job.uid, job) + } catch (err) { + console.log(`[${job.uid}] error while updating job state to ${job.state}. Job abandoned.`) + console.log(`[${job.uid}] error stack: ${err.stack}`) + continue; + } + + try { + job.onRenderProgress = (job) => { + try { + /* send render progress to our server */ + client.updateJob(job.uid, getRenderingStatus(job)); + + if (settings.onRenderProgress) { + settings.onRenderProgress(job); + } + } catch (err) { + if (settings.stopOnError) { + throw err; + } else { + console.log(`[${job.uid}] error occurred: ${err.stack}`) + console.log(`[${job.uid}] render proccess stopped with error...`) + console.log(`[${job.uid}] continue listening next job...`) + } + } + } + + job.onRenderError = (job, err /* on render error */) => { + job.error = [].concat(job.error || [], [err.toString()]); + + if (settings.onRenderError) { + settings.onRenderError(job, err); + } + } + + job = await render(job, settings); { + job.state = 'finished'; + job.finishedAt = new Date(); + if (settings.onFinished) { + settings.onFinished(job); + } + } + + settings.track('Worker Job Finished', { job_id: job.uid }) + + await client.updateJob(job.uid, getRenderingStatus(job)) + } catch (err) { + job.error = [].concat(job.error || [], [err.toString()]); + job.errorAt = new Date(); + job.state = 'error'; + + settings.track('Worker Job Error', { job_id: job.uid }); + + if (settings.onError) { + settings.onError(job, err); + } + + try { + await client.updateJob(job.uid, getRenderingStatus(job)) + } + catch (e) { + console.log(`[${job.uid}] error while updating job state to ${job.state}. Job abandoned.`) + console.log(`[${job.uid}] error stack: ${e.stack}`) + } + + if (settings.stopOnError) { + throw err; + } else { + console.log(`[${job.uid}] error occurred: ${err.stack}`) + console.log(`[${job.uid}] render proccess stopped with error...`) + console.log(`[${job.uid}] continue listening next job...`) + } + } + } while (active) + } + + /** + * Stops worker "thread" + * @return {void} + */ + const stop = () => { + if (settingsRef) { + settingsRef.logger.log('stopping nexrender-worker') + } + + active = false; + } + + /** + * Returns the current status of the worker + * @return {Boolean} + */ + const isRunning = () => { + return active; + } + + return { + start, + stop, + isRunning + } +} + +module.exports = { + createWorker, +} diff --git a/packages/nexrender-worker/test/manual.js b/packages/nexrender-worker/test/manual.js index 3d107124..756e9ff5 100644 --- a/packages/nexrender-worker/test/manual.js +++ b/packages/nexrender-worker/test/manual.js @@ -1,21 +1,14 @@ -process.env.NEXRENDER_API_POLLING = 500; +process.env.NEXRENDER_API_POLLING = 1000 +const {createWorker} = require('../src/instance') -const { nextJob } = require('../src') +const instance1 = createWorker() +const instance2 = createWorker() +const instance3 = createWorker() -let i = 0; -const listJobs = async () => { - console.log('listJobs') - if (i++ > 5) { - return [{state: 'queued'}] - } - return []; -} +setTimeout(() => instance1.start('https://localhost:3000', 'secret', {name: 'worker 1'}), 0) +setTimeout(() => instance2.start('https://localhost:3000', 'secret', {name: 'worker 2'}), 1000) +setTimeout(() => instance3.start('https://localhost:3000', 'secret', {name: 'worker 3'}), 2000) -const client = { listJobs } - -const foo = async () => { - const p = await nextJob(client) - console.log('got job', p) -} - -foo(); +setTimeout(() => instance1.stop(), 4000) +setTimeout(() => instance2.stop(), 5000) +setTimeout(() => instance3.stop(), 6000)