Skip to content

Commit

Permalink
added more logging to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
inlife committed Dec 4, 2024
1 parent 49070f6 commit 880977d
Showing 1 changed file with 37 additions and 6 deletions.
43 changes: 37 additions & 6 deletions packages/nexrender-worker/src/instance.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,27 @@ const pkg = require('../package.json')

const NEXRENDER_API_POLLING = process.env.NEXRENDER_API_POLLING || 30 * 1000;
const NEXRENDER_TOLERATE_EMPTY_QUEUES = process.env.NEXRENDER_TOLERATE_EMPTY_QUEUES;
const NEXRENDER_PICKUP_TIMEOUT = process.env.NEXRENDER_PICKUP_TIMEOUT || 60 * 1000; // 60 second timeout by default

const delay = amount => new Promise(resolve => setTimeout(resolve, amount))

// Helper function to add timeout to promises
const withTimeout = (promise, timeoutMs, errorMsg) => {
let timeoutHandle;
const timeoutPromise = new Promise((_, reject) => {
timeoutHandle = setTimeout(() => {
reject(new Error(errorMsg));
}, timeoutMs);
});

return Promise.race([
promise,
timeoutPromise,
]).finally(() => {
clearTimeout(timeoutHandle);
});
};

const createWorker = () => {
let emptyReturns = 0;
let active = false;
Expand Down Expand Up @@ -41,9 +59,14 @@ const createWorker = () => {
return null
}

let job = await (settings.tagSelector ?
await client.pickupJob(settings.tagSelector) :
await client.pickupJob()
settings.logger.log(`[worker] checking for new jobs...`);

let job = await withTimeout(
settings.tagSelector ?
client.pickupJob(settings.tagSelector) :
client.pickupJob(),
NEXRENDER_PICKUP_TIMEOUT,
'Job pickup request timed out'
);

if (job && job.uid) {
Expand All @@ -52,20 +75,28 @@ const createWorker = () => {
} else {
// no job was returned by the server. If enough checks have passed, and the exit option is set, deactivate the worker
emptyReturns++;
if (settings.exitOnEmptyQueue && emptyReturns > settings.tolerateEmptyQueues) active = false;
settings.logger.log(`[worker] no jobs available (attempt ${emptyReturns}${settings.tolerateEmptyQueues ? ` of ${settings.tolerateEmptyQueues}` : ''})`)
if (settings.exitOnEmptyQueue && emptyReturns > settings.tolerateEmptyQueues) {
settings.logger.log(`[worker] max empty queue attempts reached, deactivating worker`)
active = false;
}
}

} catch (err) {
settings.logger.error(`[worker] error checking for jobs: ${err.message}`);
if (settings.stopOnError) {
throw err;
} else {
console.error(err)
console.error("render proccess stopped with error...")
console.error("render process stopped with error...")
console.error("continue listening next job...")
}
}

if (active) await delay(settings.polling || NEXRENDER_API_POLLING)
if (active) {
settings.logger.log(`[worker] waiting ${settings.polling || NEXRENDER_API_POLLING}ms before next check...`);
await delay(settings.polling || NEXRENDER_API_POLLING)
}
} while (active)
}

Expand Down

0 comments on commit 880977d

Please sign in to comment.