Skip to content

Commit

Permalink
Extractor to submit job to a HPC cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
ddey2 committed Apr 24, 2024
1 parent 8015028 commit d1c3e24
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 0 deletions.
8 changes: 8 additions & 0 deletions sample-extractors/submit-job-to-campusc-luster/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3.8

WORKDIR /extractor
COPY requirements.txt ./
RUN pip install -r requirements.txt

COPY submitjob.py extractor_info.json ./
CMD python submitjob.py
86 changes: 86 additions & 0 deletions sample-extractors/submit-job-to-campusc-luster/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Submit job to HPC cluster

This extractor facilitates users in seamlessly sending jobs to an HPC cluster from Clowder V2.
Feel free to adjust the hostname, username, password/private key details in the submitJob.py script. Specify the command
you want to execute and submit the job. Users can monitor the status and output of the job on Clowder.

For example, this particular extractor submits a job to execute [ED2](https://github.com/EDmodel/) model on
[CampusCluster](https://campuscluster.illinois.edu). It generates an output file on the cluster, monitors
the live job status, displays the status and output, and transfers the output file to Clowder.

# Docker

This extractor is ready to be run as a docker container, the only dependency is a running Clowder instance. Simply build and run.

1. Start Clowder. For help starting Clowder, see our [getting started guide](https://github.com/clowder-framework/clowder/blob/develop/doc/src/sphinx/userguide/installing_clowder.rst).

2. First build the extractor Docker container:

```
# from this directory, run:
docker build -t clowder_submitjob .
```

3. Finally run the extractor:

```
docker run -t -i --rm --net clowder_clowder -e "RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672/%2f" --name "submitjob" clowder_submitjob
```

Then open the Clowder web app and run the submitjob extractor on a .txt file (or similar)! Done.

### Python and Docker details

You may use any version of Python 3. Simply edit the first line of the `Dockerfile`, by default it uses `FROM python:3.8`.

Docker flags:

- `--net` links the extractor to the Clowder Docker network (run `docker network ls` to identify your own.)
- `-e RABBITMQ_URI=` sets the environment variables can be used to control what RabbitMQ server and exchange it will bind itself to. Setting the `RABBITMQ_EXCHANGE` may also help.
- You can also use `--link` to link the extractor to a RabbitMQ container.
- `--name` assigns the container a name visible in Docker Desktop.

## Troubleshooting

**If you run into _any_ trouble**, please reach out on our Clowder Slack in the [#pyclowder channel](https://clowder-software.slack.com/archives/CNC2UVBCP).

Alternate methods of running extractors are below.

# Commandline Execution

To execute the extractor from the command line you will need to have the required packages installed. It is highly recommended to use python virtual environment for this. You will need to create a virtual environment first, then activate it and finally install all required packages.

```
virtualenv /home/clowder/virtualenv/wordcount
. /home/clowder/virtualenv/wordcount/bin/activate
pip install -r /home/clowder/extractors/wordcount/requirements.txt
```

To start the extractor you will need to load the virtual environment and start the extractor.

```
. /home/clowder/virtualenv/wordcount/bin/activate
/home/clowder/extractors/wordcount/wordcount.py
```

# Systemd Start

The example service file provided in sample-extractors will start the docker container at system startup. This can be used with CoreOS or RedHat systems to make sure the wordcount extractor starts when the machine comes online. This expects the docker system to be installed.

All you need to do is copy clowder-wordcount.service to /etc/systemd/system and run, edit it to set the parameters for rabbitmq and run the following commands:

```
systemctl enable clowder-wordcount.service
systemctl start clowder-wordcount.service
```

To see the log you can use:

```
journalctl -f -u clowder-wordcount.service
```

# Upstart

The example conf file provided in sample-extractors will start the extractor on an Ubuntu system. This assumes that the system is setup for commandline execution. This will make it so the wordcount extractor starts when the system boots up. This extractor can be configured by specifying the same environment variables as using in the docker container. Any of the console output will go into /var/log/upstart/wordcount.log.
27 changes: 27 additions & 0 deletions sample-extractors/submit-job-to-campusc-luster/extractor_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"@context": "http://clowder.ncsa.illinois.edu/contexts/extractors.jsonld",
"name": "ncsa.submitjob",
"version": "2.0",
"description": "SubmitJob extractor. Submits job to Campus Cluster.",
"author": "Dipannita Dey <[email protected]>",
"contributors": [],
"contexts": [
{
}
],
"repository": [
{
"repType": "git",
"repUrl": ""
}
],
"process": {
"file": [
"text/*",
"application/json"
]
},
"external_services": [],
"dependencies": [],
"bibtex": []
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pyclowder==3.0.6
paramiko==3.4

97 changes: 97 additions & 0 deletions sample-extractors/submit-job-to-campusc-luster/submitjob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python
#!pip install paramiko

"""Example extractor based on the clowder code."""

import logging
import paramiko
import time as timer

import pyclowder
from pyclowder.extractors import Extractor


class SubmitJob(Extractor):
"""Submit a job to HPC cluster"""
def __init__(self):
Extractor.__init__(self)
self.setup()

# setup logging for the exctractor
logging.getLogger('pyclowder').setLevel(logging.DEBUG)
logging.getLogger('__main__').setLevel(logging.DEBUG)

def process_message(self, connector, host, secret_key, resource, parameters):
# Process the file and upload the results

logger = logging.getLogger(__name__)
inputfile = resource["local_paths"][0]
file_id = resource['id']
dataset_id = resource["parent"]["id"]
print(parameters.get("parameters"))

# SSH connection details
hostname = 'cc-login.campuscluster.illinois.edu' # the hostname of the cluster you want to run job
port = 22 # Default SSH port
username = "" # username on the cluster
#password = ""
private_key = paramiko.RSAKey(filename="private_key")

# Use the following if it's not an RSA key
#private_key = paramiko.pkey.PKey.from_path(pkey_path="<private_key_file_path>")
# command to submit the slurm job
command = "sbatch run_umbs.bat"

# Create an SSH client
ssh_client = paramiko.SSHClient()

# Automatically add the server's host key (this is insecure and should only be used for testing)
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

try:
# Connect to the SSH server
#ssh_client.connect(hostname, port, username, password)
ssh_client.connect(hostname, port, username, pkey=private_key)

# Perform actions on the cluster (e.g., execute commands)
stdin, stdout, stderr = ssh_client.exec_command(command)
result = stdout.read().decode()

# Print the output of the command
print("Output:")
#print(result)
job_id = result.split()[3]

# Check the job status periodically
while True:
_, stdo, stde = ssh_client.exec_command(f"squeue -u {username} -j {job_id}")
job_status = stdo.read().decode()
print(job_status)
connector.message_process(resource, job_status)

# Break the loop if the job is completed or failed
if job_id not in job_status:
break

# Wait for a few seconds before checking again
timer.sleep(60) # make it environmental variable

sftp = ssh_client.open_sftp()
# filepath of output file generated by the slurm job on the cluster
remote_file_path = f"/home/{username}/openmp_umbs.o{job_id}"
local_file_path = f"openmp_umbs.o{job_id}"
sftp.get(remote_file_path, local_file_path)

# Upload the output file to Clowder2
file_id = pyclowder.files.upload_to_dataset(connector, host, secret_key, dataset_id, local_file_path,
check_duplicate=False)

finally:
# Close the SSH connection
sftp.close()
ssh_client.close()


if __name__ == "__main__":
extractor = SubmitJob()
extractor.start()

0 comments on commit d1c3e24

Please sign in to comment.