diff --git a/Makefile b/Makefile index 149d78f..8ad0e66 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,7 @@ directories: mkdir -p app/gard/output mkdir -p app/meme/output mkdir -p app/multihit/output + mkdir -p app/nrm/output mkdir -p app/prime/output mkdir -p app/relax/output mkdir -p app/slac/output diff --git a/app/hyphyjob.js b/app/hyphyjob.js index d16e1a8..e8f9364 100644 --- a/app/hyphyjob.js +++ b/app/hyphyjob.js @@ -12,7 +12,8 @@ const cs = require("../lib/clientsocket.js"), // Use redis as our key-value store var client = redis.createClient({ - host: config.redis_host, port: config.redis_port + host: config.redis_host, + port: config.redis_port }); var hyphyJob = function() {}; @@ -48,7 +49,6 @@ hyphyJob.prototype.attachSocket = function() { // Can either initialize a new job or check on previous one hyphyJob.prototype.init = function() { - var self = this; // store parameters in redis @@ -56,17 +56,15 @@ hyphyJob.prototype.init = function() { self.attachSocket(); // If check param set, don't start new job - if(self.params["checkOnly"]) { + if (self.params["checkOnly"]) { self.torque_id = self.params["torque_id"]; self.checkJob(); } else { self.spawn(); } - }; hyphyJob.prototype.spawn = function() { - var self = this; self.log("spawning"); @@ -125,7 +123,7 @@ hyphyJob.prototype.spawn = function() { self.onJobMetadata(status); }); - fs.writeFile(self.fn, self.stream, (err) => { + fs.writeFile(self.fn, self.stream, err => { if (err) throw err; // Pass filename in as opposed to generating it in spawn_hyphyJob hyphy_job_runner.submit(self.qsub_params, self.output_dir); @@ -142,11 +140,9 @@ hyphyJob.prototype.spawn = function() { self.socket.on("disconnect", function() { self.log("user disconnected"); }); - }; hyphyJob.prototype.onJobCreated = function(torque_id) { - var self = this; self.push_active_job = function() { @@ -167,22 +163,17 @@ hyphyJob.prototype.onJobCreated = function(torque_id) { client.hset(self.torque_id, "sites", self.params.msa[0].sites); client.hset(self.torque_id, "sequences", self.params.msa[0].sequences); self.push_job_once(self.id); - }; hyphyJob.prototype.onComplete = function() { - var self = this; fs.readFile(self.results_fn, "utf8", function(err, data) { - if (err) { // Error reading results file self.onError("unable to read results file. " + err); } else { - if (data && data.length > 0) { - // Prepare redis packet for delivery var redis_packet = { results: data }; redis_packet.type = "completed"; @@ -200,7 +191,6 @@ hyphyJob.prototype.onComplete = function() { // Remove id from active_job queue client.lrem("active_jobs", 1, self.id); delete this; - } else { // Empty results file self.onError("job seems to have completed, but no results found"); @@ -208,11 +198,9 @@ hyphyJob.prototype.onComplete = function() { } } }); - }; hyphyJob.prototype.onStatusUpdate = function(data) { - var self = this; self.current_status = data; @@ -236,20 +224,16 @@ hyphyJob.prototype.onStatusUpdate = function(data) { // Log status update on server self.log("status update", str_redis_packet); - }; hyphyJob.prototype.onJobMetadata = function(data) { - var self = this; self.stime = data.stime; self.ctime = data.ctime; - }; // If a job is cancelled early or the result contents cannot be read hyphyJob.prototype.onError = function(error) { - var self = this; // The packet that will delivered to datamonkey via the publish command @@ -264,7 +248,6 @@ hyphyJob.prototype.onError = function(error) { var promises = [std_err_promise, progress_fn_promise, std_out_promise]; Q.allSettled(promises).then(function(results) { - // Prepare redis packet for delivery redis_packet.stderr = results[0].value; redis_packet.progress = results[1].value; @@ -284,15 +267,12 @@ hyphyJob.prototype.onError = function(error) { }); delete this; - }); - }; // Called when a job is first created // Set id and output file names hyphyJob.prototype.setTorqueParameters = function(torque_id) { - var self = this; self.torque_id = torque_id.torque_id; @@ -305,12 +285,10 @@ hyphyJob.prototype.setTorqueParameters = function(torque_id) { path.join(self.output_dir, self.qsub_script_name) + ".o" + self.torque_id.split(".")[0]; - }; // Cancel the job and report an error hyphyJob.prototype.cancel = function() { - var self = this; var cb = function() { @@ -321,50 +299,40 @@ hyphyJob.prototype.cancel = function() { self.cancel_once = _.once(jobdel.jobDelete); self.cancel_once(self.torque_id, cb); - }; // Return whether job has completed, still running, or aborted hyphyJob.prototype.checkJob = function() { - var self = this; // Get results file stats - fs.stat(self.results_fn, (err, res) => { - - if(err || !res) { - + fs.stat(self.results_fn, (err, res) => { + if (err || !res) { // Get status of job var jobStatus = new JobStatus(self.torque_id); jobStatus.returnJobStatus(self.torque_id, function(err, status) { - if(err) { + if (err) { // If job has no status returned and there are no results, return aborted self.warn("no status, and no completed results; job aborted"); self.onError("no status, and no completed results; job aborted"); - return + return; } else { // If job has status returned, don't do anything self.log("status", status); self.onStatusUpdate(status); - return + return; } - }); - } else if (res.size > 0) { - //If job has no status returned and there are results, return completed self.onComplete(); return; - } else { self.warn("no status, and no completed results; job aborted"); self.onError("no status, and no completed results; job aborted"); return; } - }); - }; exports.hyphyJob = hyphyJob; diff --git a/app/nrm/nrm.js b/app/nrm/nrm.js new file mode 100644 index 0000000..d6feba5 --- /dev/null +++ b/app/nrm/nrm.js @@ -0,0 +1,77 @@ +var config = require("../../config.json"), + hyphyJob = require("../hyphyjob.js").hyphyJob, + code = require("../code").code, + util = require("util"), + fs = require("fs"), + path = require("path"); + +var nrm = function(socket, stream, params) { + + var self = this; + self.socket = socket; + self.stream = stream; + self.params = params; + + // object specific attributes + self.type = "nrm"; + self.qsub_script_name = "nrm.sh"; + self.qsub_script = __dirname + "/" + self.qsub_script_name; + + // parameter attributes + self.msaid = self.params.msa._id; + self.id = self.params.analysis._id; + self.nwk_tree = self.params.msa[0].usertree || self.params.msa[0].nj; + + // parameter-derived attributes + self.fn = __dirname + "/output/" + self.id; + self.output_dir = path.dirname(self.fn); + self.status_fn = self.fn + ".status"; + self.results_short_fn = self.fn + ".nrm"; + self.results_fn = self.fn + ".NRM.json"; + self.progress_fn = self.fn + ".nrm.progress"; + self.tree_fn = self.fn + ".tre"; + + self.qsub_params = [ + "-l walltime=" + + config.nrm_walltime + + ",nodes=1:ppn=" + + config.nrm_procs, + "-q", + config.qsub_queue, + "-v", + "fn=" + + self.fn + + ",tree_fn=" + + self.tree_fn + + ",sfn=" + + self.status_fn + + ",pfn=" + + self.progress_fn + + ",rfn=" + + self.results_fn + + ",cwd=" + + __dirname + + ",msaid=" + + self.msaid + + ",procs=" + + config.nrm_procs, + "-o", + self.output_dir, + "-e", + self.output_dir, + self.qsub_script + ]; + + // Write tree to a file + fs.writeFile(self.tree_fn, self.nwk_tree, function(err) { + if (err) throw err; + }); + + // Ensure the progress file exists + fs.openSync(self.progress_fn, "w"); + self.init(); + +}; + +util.inherits(nrm, hyphyJob); +exports.nrm = nrm; diff --git a/app/nrm/nrm.sh b/app/nrm/nrm.sh new file mode 100644 index 0000000..7ef7180 --- /dev/null +++ b/app/nrm/nrm.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +export PATH=/usr/local/bin:$PATH +source /etc/profile.d/modules.sh + +module load aocc/1.3.0 +module load openmpi/gnu/3.1.6 + +FN=$fn +CWD=$cwd +TREE_FN=$tree_fn +STATUS_FILE=$sfn +PROGRESS_FILE=$pfn +RESULTS_FN=$rfn +GENETIC_CODE=$genetic_code +RATE_CLASSES=$rate_classes +PROCS=$procs + +HYPHY=$CWD/../../.hyphy/hyphy +HYPHY_PATH=$CWD/../../.hyphy/res/ +HYPHY_ANALYSES_PATH=$CWD/../../.hyphy-analyses +NRM=$HYPHY_ANALYSES_PATH/NucleotideNonREV/NRM.bf + +export HYPHY_PATH=$HYPHY_PATH +trap 'echo "Error" > $STATUS_FILE; exit 1' ERR + +echo "$HYPHY LIBPATH=$HYPHY_PATH $NRM --alignment $FN --output $RESULTS_FN" +$HYPHY LIBPATH=$HYPHY_PATH $NRM --alignment $FN --output $RESULTS_FN > $PROGRESS_FILE +echo "Completed" > $STATUS_FILE + + diff --git a/package.json b/package.json index b39c440..f165a9e 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "datamonkey-js-server", "description": "", - "version": "2.3.7", + "version": "2.4.0", "engines": { "node": ">=13" }, diff --git a/server.js b/server.js index fcb9eda..5238b4f 100644 --- a/server.js +++ b/server.js @@ -13,6 +13,7 @@ const config = require("./config.json"), hivtrace = require("./app/hivtrace/hivtrace.js"), meme = require("./app/meme/meme.js"), multihit = require("./app/multihit/multihit.js"), + nrm = require("./app/nrm/nrm.js"), prime = require("./app/prime/prime.js"), relax = require("./app/relax/relax.js"), slac = require("./app/slac/slac.js"), @@ -215,6 +216,23 @@ io.sockets.on("connection", function(socket) { } }); +// NRM + r.route("nrm", { + spawn: function(stream, params) { + new nrm.nrm(socket, stream, params.job); + }, + check: function(params) { + params.job["checkOnly"] = true; + new nrm.nrm(socket, null, params.job); + }, + resubscribe: function(params) { + new job.resubscribe(socket, params.id); + }, + cancel: function(params) { + new job.cancel(socket, params.id); + } + }); + // MEME r.route("meme", { spawn: function(stream, params) {