diff --git a/packages/nexrender-worker/src/instance.js b/packages/nexrender-worker/src/instance.js index 97bff2cd..a20d4dd6 100644 --- a/packages/nexrender-worker/src/instance.js +++ b/packages/nexrender-worker/src/instance.js @@ -5,9 +5,27 @@ const pkg = require('../package.json') const NEXRENDER_API_POLLING = process.env.NEXRENDER_API_POLLING || 30 * 1000; const NEXRENDER_TOLERATE_EMPTY_QUEUES = process.env.NEXRENDER_TOLERATE_EMPTY_QUEUES; +const NEXRENDER_PICKUP_TIMEOUT = process.env.NEXRENDER_PICKUP_TIMEOUT || 60 * 1000; // 60 second timeout by default const delay = amount => new Promise(resolve => setTimeout(resolve, amount)) +// Helper function to add timeout to promises +const withTimeout = (promise, timeoutMs, errorMsg) => { + let timeoutHandle; + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(errorMsg)); + }, timeoutMs); + }); + + return Promise.race([ + promise, + timeoutPromise, + ]).finally(() => { + clearTimeout(timeoutHandle); + }); +}; + const createWorker = () => { let emptyReturns = 0; let active = false; @@ -41,9 +59,14 @@ const createWorker = () => { return null } - let job = await (settings.tagSelector ? - await client.pickupJob(settings.tagSelector) : - await client.pickupJob() + settings.logger.log(`[worker] checking for new jobs...`); + + let job = await withTimeout( + settings.tagSelector ? + client.pickupJob(settings.tagSelector) : + client.pickupJob(), + NEXRENDER_PICKUP_TIMEOUT, + 'Job pickup request timed out' ); if (job && job.uid) { @@ -52,20 +75,28 @@ const createWorker = () => { } else { // no job was returned by the server. If enough checks have passed, and the exit option is set, deactivate the worker emptyReturns++; - if (settings.exitOnEmptyQueue && emptyReturns > settings.tolerateEmptyQueues) active = false; + settings.logger.log(`[worker] no jobs available (attempt ${emptyReturns}${settings.tolerateEmptyQueues ? ` of ${settings.tolerateEmptyQueues}` : ''})`) + if (settings.exitOnEmptyQueue && emptyReturns > settings.tolerateEmptyQueues) { + settings.logger.log(`[worker] max empty queue attempts reached, deactivating worker`) + active = false; + } } } catch (err) { + settings.logger.error(`[worker] error checking for jobs: ${err.message}`); if (settings.stopOnError) { throw err; } else { console.error(err) - console.error("render proccess stopped with error...") + console.error("render process stopped with error...") console.error("continue listening next job...") } } - if (active) await delay(settings.polling || NEXRENDER_API_POLLING) + if (active) { + settings.logger.log(`[worker] waiting ${settings.polling || NEXRENDER_API_POLLING}ms before next check...`); + await delay(settings.polling || NEXRENDER_API_POLLING) + } } while (active) }