forked from billbrod/snakemake-slurm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathslurm-submit.py
executable file
·137 lines (120 loc) · 4.49 KB
/
slurm-submit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
import sys
import re
import argparse
import subprocess
from snakemake.utils import read_job_properties
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
"--help", help="Display help message.", action="store_true")
parser.add_argument(
"positional", action="append",
nargs="?", metavar="POS",
help="additional arguments not in slurm parser group to pass to sbatch")
# A subset of SLURM-specific arguments
slurm_parser = parser.add_argument_group("slurm-specific arguments")
slurm_parser.add_argument(
"-a", "--array", help="job array index values")
slurm_parser.add_argument(
"-A", "--account", help="charge job to specified account")
slurm_parser.add_argument(
"--begin", help="defer job until HH:MM MM/DD/YY")
slurm_parser.add_argument(
"-c", "--cpus-per-task", help="number of cpus required per task")
slurm_parser.add_argument(
"-d", "--dependency",
help="defer job until condition on jobid is satisfied")
slurm_parser.add_argument(
"-D", "--workdir", help="set working directory for batch script")
slurm_parser.add_argument(
"-e", "--error", help="file for batch script's standard error")
slurm_parser.add_argument(
"-J", "--job-name", help="name of job")
slurm_parser.add_argument(
"--mail-type", help="notify on state change: BEGIN, END, FAIL or ALL")
slurm_parser.add_argument(
"--mail-user", help="who to send email notification for job state changes")
slurm_parser.add_argument(
"-n", "--ntasks", help="number of tasks to run")
slurm_parser.add_argument(
"-N", "--nodes", help="number of nodes on which to run (N = min[-max])")
slurm_parser.add_argument(
"-o", "--output", help="file for batch script's standard output")
slurm_parser.add_argument(
"-p", "--partition", help="partition requested")
slurm_parser.add_argument(
"-Q", "--quiet", help="quiet mode (suppress informational messages)")
slurm_parser.add_argument(
"-t", "--time", help="time limit")
slurm_parser.add_argument(
"--wrap", help="wrap command string in a sh script and submit")
slurm_parser.add_argument(
"-C", "--constraint", help="specify a list of constraints")
slurm_parser.add_argument(
"--mem", help="minimum amount of real memory")
slurm_parser.add_argument(
"--gres", help="generic resource scheduling. used e.g., for gpus: --gres gpu:1")
args = parser.parse_args()
if args.help:
parser.print_help()
sys.exit(0)
jobscript = sys.argv[-1]
job_properties = read_job_properties(jobscript)
extras = ""
if args.positional:
for m in args.positional:
if m is not None:
extras = extras + " " + m
arg_dict = dict(args.__dict__)
if "cluster" in job_properties:
for k, v in job_properties["cluster"].items():
arg_dict[k] = v
# Process resources
if "resources" in job_properties:
resources = job_properties["resources"]
if arg_dict["time"] is None:
if "runtime" in resources:
arg_dict["time"] = resources["runtime"]
elif "walltime" in resources:
arg_dict["time"] = resources["runtime"]
if "mem" in resources and arg_dict["mem"] is None:
arg_dict["mem"] = resources["mem"]
# Threads
# if "threads" in job_properties:
# arg_dict["ntasks"] = job_properties["threads"]
opt_keys = ["array", "account", "begin", "cpus_per_task",
"depedency", "workdir", "error", "job_name", "mail_type",
"mail_user", "ntasks", "nodes", "output", "partition",
"quiet", "time", "wrap", "constraint", "mem", "gres"]
# Set default partition
if arg_dict["partition"] is None or arg_dict['partition'] == 'None':
if not "":
# partitions and SLURM - If not specified, the default behavior is to
# allow the slurm controller to select the default partition as
# designated by the system administrator.
opt_keys.remove("partition")
else:
arg_dict["partition"] = ""
opts = ""
for k, v in arg_dict.items():
if k not in opt_keys:
continue
if v is not None:
opts += " --{} \"{}\" ".format(k.replace("_", "-"), v)
if arg_dict["wrap"] is not None:
cmd = "sbatch {opts}".format(opts=opts)
else:
cmd = "sbatch {opts} {extras}".format(opts=opts, extras=extras)
try:
res = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise e
# Get jobid
res = res.stdout.decode()
try:
m = re.search("Submitted batch job (\d+)", res)
jobid = m.group(1)
print(jobid)
except Exception as e:
print(e)
raise