Skip to content

Commit

Permalink
Merge pull request #310 from veg/develop
Browse files Browse the repository at this point in the history
NRM
  • Loading branch information
stevenweaver authored Mar 3, 2023
2 parents cc5784c + ca197a9 commit febf83a
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 42 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ directories:
mkdir -p app/gard/output
mkdir -p app/meme/output
mkdir -p app/multihit/output
mkdir -p app/nrm/output
mkdir -p app/prime/output
mkdir -p app/relax/output
mkdir -p app/slac/output
Expand Down
50 changes: 9 additions & 41 deletions app/hyphyjob.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ const cs = require("../lib/clientsocket.js"),

// Use redis as our key-value store
var client = redis.createClient({
host: config.redis_host, port: config.redis_port
host: config.redis_host,
port: config.redis_port
});

var hyphyJob = function() {};
Expand Down Expand Up @@ -48,25 +49,22 @@ hyphyJob.prototype.attachSocket = function() {

// Can either initialize a new job or check on previous one
hyphyJob.prototype.init = function() {

var self = this;

// store parameters in redis
client.hset(self.id, "params", JSON.stringify(self.params));
self.attachSocket();

// If check param set, don't start new job
if(self.params["checkOnly"]) {
if (self.params["checkOnly"]) {
self.torque_id = self.params["torque_id"];
self.checkJob();
} else {
self.spawn();
}

};

hyphyJob.prototype.spawn = function() {

var self = this;

self.log("spawning");
Expand Down Expand Up @@ -125,7 +123,7 @@ hyphyJob.prototype.spawn = function() {
self.onJobMetadata(status);
});

fs.writeFile(self.fn, self.stream, (err) => {
fs.writeFile(self.fn, self.stream, err => {
if (err) throw err;
// Pass filename in as opposed to generating it in spawn_hyphyJob
hyphy_job_runner.submit(self.qsub_params, self.output_dir);
Expand All @@ -142,11 +140,9 @@ hyphyJob.prototype.spawn = function() {
self.socket.on("disconnect", function() {
self.log("user disconnected");
});

};

hyphyJob.prototype.onJobCreated = function(torque_id) {

var self = this;

self.push_active_job = function() {
Expand All @@ -167,22 +163,17 @@ hyphyJob.prototype.onJobCreated = function(torque_id) {
client.hset(self.torque_id, "sites", self.params.msa[0].sites);
client.hset(self.torque_id, "sequences", self.params.msa[0].sequences);
self.push_job_once(self.id);

};

hyphyJob.prototype.onComplete = function() {

var self = this;

fs.readFile(self.results_fn, "utf8", function(err, data) {

if (err) {
// Error reading results file
self.onError("unable to read results file. " + err);
} else {

if (data && data.length > 0) {

// Prepare redis packet for delivery
var redis_packet = { results: data };
redis_packet.type = "completed";
Expand All @@ -200,19 +191,16 @@ hyphyJob.prototype.onComplete = function() {
// Remove id from active_job queue
client.lrem("active_jobs", 1, self.id);
delete this;

} else {
// Empty results file
self.onError("job seems to have completed, but no results found");
delete this;
}
}
});

};

hyphyJob.prototype.onStatusUpdate = function(data) {

var self = this;
self.current_status = data;

Expand All @@ -236,20 +224,16 @@ hyphyJob.prototype.onStatusUpdate = function(data) {

// Log status update on server
self.log("status update", str_redis_packet);

};

hyphyJob.prototype.onJobMetadata = function(data) {

var self = this;
self.stime = data.stime;
self.ctime = data.ctime;

};

// If a job is cancelled early or the result contents cannot be read
hyphyJob.prototype.onError = function(error) {

var self = this;

// The packet that will delivered to datamonkey via the publish command
Expand All @@ -264,7 +248,6 @@ hyphyJob.prototype.onError = function(error) {
var promises = [std_err_promise, progress_fn_promise, std_out_promise];

Q.allSettled(promises).then(function(results) {

// Prepare redis packet for delivery
redis_packet.stderr = results[0].value;
redis_packet.progress = results[1].value;
Expand All @@ -284,15 +267,12 @@ hyphyJob.prototype.onError = function(error) {
});

delete this;

});

};

// Called when a job is first created
// Set id and output file names
hyphyJob.prototype.setTorqueParameters = function(torque_id) {

var self = this;
self.torque_id = torque_id.torque_id;

Expand All @@ -305,12 +285,10 @@ hyphyJob.prototype.setTorqueParameters = function(torque_id) {
path.join(self.output_dir, self.qsub_script_name) +
".o" +
self.torque_id.split(".")[0];

};

// Cancel the job and report an error
hyphyJob.prototype.cancel = function() {

var self = this;

var cb = function() {
Expand All @@ -321,50 +299,40 @@ hyphyJob.prototype.cancel = function() {

self.cancel_once = _.once(jobdel.jobDelete);
self.cancel_once(self.torque_id, cb);

};

// Return whether job has completed, still running, or aborted
hyphyJob.prototype.checkJob = function() {

var self = this;

// Get results file stats
fs.stat(self.results_fn, (err, res) => {

if(err || !res) {

fs.stat(self.results_fn, (err, res) => {
if (err || !res) {
// Get status of job
var jobStatus = new JobStatus(self.torque_id);
jobStatus.returnJobStatus(self.torque_id, function(err, status) {
if(err) {
if (err) {
// If job has no status returned and there are no results, return aborted
self.warn("no status, and no completed results; job aborted");
self.onError("no status, and no completed results; job aborted");
return
return;
} else {
// If job has status returned, don't do anything
self.log("status", status);
self.onStatusUpdate(status);
return
return;
}

});

} else if (res.size > 0) {

//If job has no status returned and there are results, return completed
self.onComplete();
return;

} else {
self.warn("no status, and no completed results; job aborted");
self.onError("no status, and no completed results; job aborted");
return;
}

});

};

exports.hyphyJob = hyphyJob;
77 changes: 77 additions & 0 deletions app/nrm/nrm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
var config = require("../../config.json"),
hyphyJob = require("../hyphyjob.js").hyphyJob,
code = require("../code").code,
util = require("util"),
fs = require("fs"),
path = require("path");

var nrm = function(socket, stream, params) {

var self = this;
self.socket = socket;
self.stream = stream;
self.params = params;

// object specific attributes
self.type = "nrm";
self.qsub_script_name = "nrm.sh";
self.qsub_script = __dirname + "/" + self.qsub_script_name;

// parameter attributes
self.msaid = self.params.msa._id;
self.id = self.params.analysis._id;
self.nwk_tree = self.params.msa[0].usertree || self.params.msa[0].nj;

// parameter-derived attributes
self.fn = __dirname + "/output/" + self.id;
self.output_dir = path.dirname(self.fn);
self.status_fn = self.fn + ".status";
self.results_short_fn = self.fn + ".nrm";
self.results_fn = self.fn + ".NRM.json";
self.progress_fn = self.fn + ".nrm.progress";
self.tree_fn = self.fn + ".tre";

self.qsub_params = [
"-l walltime=" +
config.nrm_walltime +
",nodes=1:ppn=" +
config.nrm_procs,
"-q",
config.qsub_queue,
"-v",
"fn=" +
self.fn +
",tree_fn=" +
self.tree_fn +
",sfn=" +
self.status_fn +
",pfn=" +
self.progress_fn +
",rfn=" +
self.results_fn +
",cwd=" +
__dirname +
",msaid=" +
self.msaid +
",procs=" +
config.nrm_procs,
"-o",
self.output_dir,
"-e",
self.output_dir,
self.qsub_script
];

// Write tree to a file
fs.writeFile(self.tree_fn, self.nwk_tree, function(err) {
if (err) throw err;
});

// Ensure the progress file exists
fs.openSync(self.progress_fn, "w");
self.init();

};

util.inherits(nrm, hyphyJob);
exports.nrm = nrm;
31 changes: 31 additions & 0 deletions app/nrm/nrm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash

export PATH=/usr/local/bin:$PATH
source /etc/profile.d/modules.sh

module load aocc/1.3.0
module load openmpi/gnu/3.1.6

FN=$fn
CWD=$cwd
TREE_FN=$tree_fn
STATUS_FILE=$sfn
PROGRESS_FILE=$pfn
RESULTS_FN=$rfn
GENETIC_CODE=$genetic_code
RATE_CLASSES=$rate_classes
PROCS=$procs

HYPHY=$CWD/../../.hyphy/hyphy
HYPHY_PATH=$CWD/../../.hyphy/res/
HYPHY_ANALYSES_PATH=$CWD/../../.hyphy-analyses
NRM=$HYPHY_ANALYSES_PATH/NucleotideNonREV/NRM.bf

export HYPHY_PATH=$HYPHY_PATH
trap 'echo "Error" > $STATUS_FILE; exit 1' ERR

echo "$HYPHY LIBPATH=$HYPHY_PATH $NRM --alignment $FN --output $RESULTS_FN"
$HYPHY LIBPATH=$HYPHY_PATH $NRM --alignment $FN --output $RESULTS_FN > $PROGRESS_FILE
echo "Completed" > $STATUS_FILE


2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "datamonkey-js-server",
"description": "",
"version": "2.3.7",
"version": "2.4.0",
"engines": {
"node": ">=13"
},
Expand Down
18 changes: 18 additions & 0 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const config = require("./config.json"),
hivtrace = require("./app/hivtrace/hivtrace.js"),
meme = require("./app/meme/meme.js"),
multihit = require("./app/multihit/multihit.js"),
nrm = require("./app/nrm/nrm.js"),
prime = require("./app/prime/prime.js"),
relax = require("./app/relax/relax.js"),
slac = require("./app/slac/slac.js"),
Expand Down Expand Up @@ -215,6 +216,23 @@ io.sockets.on("connection", function(socket) {
}
});

// NRM
r.route("nrm", {
spawn: function(stream, params) {
new nrm.nrm(socket, stream, params.job);
},
check: function(params) {
params.job["checkOnly"] = true;
new nrm.nrm(socket, null, params.job);
},
resubscribe: function(params) {
new job.resubscribe(socket, params.id);
},
cancel: function(params) {
new job.cancel(socket, params.id);
}
});

// MEME
r.route("meme", {
spawn: function(stream, params) {
Expand Down

0 comments on commit febf83a

Please sign in to comment.