-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunGuppy.py
executable file
·239 lines (208 loc) · 7.56 KB
/
runGuppy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#!/home/sbsuser/.edm/envs/python3/bin/python3
import sys
import os
import re
from random import shuffle
from subprocess import Popen, PIPE
import getopt
import shutil
import h5py
import json
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
guppyBin= '/home/sbsuser/guppy-2.2.2/bin/'
guppyVersion = '2.2.2'
def usage() :
print("""
runGuppy.py [-aD] [-j n] [-k kit] [-c conf] [-f flowcell_type]
Create chunks of raw fast5 input data to run several instances of
the Guppy basecaller in a cluster.
-j number of jobs to use (default 30)
-D use 1D^2 chemistry
-a get flowcell and kit from fast5 file
-f flowcell type (must specify kit too)
-k kit used in sequencing (must specify flowcell too)
-c specify a configuration file (instead of flowcell & kit)
Input fast5 must be located in the current directory tree.
Output fastq is located under the stcratch/out directory. A RunInfo json
is also created with all the experiment's details
""")
try:
opts, args = getopt.getopt(sys.argv[1:],"k:f:j:c:aD", ["help"])
except getopt.GetoptError as err:
# print help information and exit
print (str(err))
usage()
exit()
kit = None
flowcellType = None
flowcell = None
start = None
useFast5Info = False
pc = None
runNumber = None
etype = None
minionId = None
minknowV = None
D2 = False
config = None
mask = 0o755 #mkdir mask
chunks = 30
pwd = os.getcwd()
for iopt,iarg in opts :
if iopt in ("-h", "--help"):
usage()
sys.exit()
if iopt == "-j" :
chunks = int(iarg)
if iopt == "-D" :
D2 = True
if iopt == "-f" :
flowcellType = iarg
if iopt == "-k":
kit = iarg
if iopt == "-a" :
useFast5Info = True
if iopt == "-c" :
config = iarg
if ( kit == None and flowcellType == None and useFast5Info == False and config == None) :
usage()
exit()
if ( ( kit != None or flowcellType != None) and useFast5Info == True ) :
print("cant specify option -a and manually select flowcell and kit")
usage()
exit()
if ( ( kit == None and flowcellType == None) and useFast5Info == False and config == None ) :
print("must specify both flowcell and kit")
usage()
exit()
if ( (kit != None or flowcellType != None or useFast5Info == True) and config != None ) :
print("cant specifiy configuration and -f,-k or -a options")
usage()
exit()
#get a list of all the fast5 files
cmd = "lfs find %s -type f -name '*.fast5' 2> /dev/null"%pwd
p = Popen(cmd ,shell=True, stdout=PIPE)
(output, error) = p.communicate()
p.wait()
allFast5 = output.decode('utf-8').split('\n')[:-1]
#include any fast5.tmp files in the list
cmd = "lfs find %s -type f -name '*.fast5.tmp' 2> /dev/null"%pwd
p = Popen(cmd ,shell=True, stdout=PIPE)
(output, error) = p.communicate()
p.wait()
allFast5 += output.decode().split('\n')[:-1]
#read experiment info form a fast5 file
f = h5py.File(allFast5[0],'r')
topGroups = list(f.keys())
if ( 'UniqueGlobalKey' in topGroups ) :
globalKey = 'UniqueGlobalKey' #single read fast5
else :
globalKey = list(f.keys())[0] #multi read fast5
flowcellType_fast5 = f['/%s/context_tags/'%globalKey].attrs['flowcell_type'].decode().upper()
kit_fast5 = f['/%s/context_tags/'%globalKey].attrs['sequencing_kit'].decode().upper()
flowcell = f['/%s/tracking_id/'%globalKey].attrs['flow_cell_id'].decode().upper()
start = f['/%s/tracking_id/'%globalKey].attrs['exp_start_time'].decode()
etype = f['/%s/context_tags/'%globalKey].attrs['experiment_type'].decode()
deviceType = f['/%s/tracking_id/'%globalKey].attrs['device_type'].decode()
deviceId = f['/%s/tracking_id/'%globalKey].attrs['device_id'].decode()
minknowV = f['/%s/tracking_id/'%globalKey].attrs['version'].decode()
if ( D2 ) :
chemistry = '1D^2'
else :
chemistry = '1D'
#store all the run parameters in a json
runInfo = { 'flowcell' : flowcell, 'flowcellType' : flowcellType_fast5 , 'kit': kit_fast5 , 'startTime' : start, 'guppyVersion' : guppyVersion , 'pc' : pc , 'runNumber' : runNumber, 'barcoded' : False, 'experimentType' : etype, 'instrumentType' : deviceType, 'instrumentId' : deviceId, 'minknowCoreVersion' : minknowV, 'chemistry' : chemistry }
f = open('./RunInfo','w')
json.dump(runInfo,f)
f.close()
if ( flowcell == None ) :
print("Flowcell name is not defined in fast5, using None!")
if ( flowcellType_fast5 == '' or kit_fast5 == '') :
exit("Couldn't read run information from fast5 file (flowcell type %s, kit %s)!"%(flowcellType_fast5, kit_fast5) )
if ( useFast5Info ) :
flowcellType= flowcellType_fast5
kit = kit_fast5
elif ( config != None ) :
flowcellType= flowcellType_fast5
kit = kit_fast5
else :
if ( flowcellType != flowcellType_fast5 or kit != kit_fast5 ) :
print("Warning specified flowcell %s and kit %s dont match fast5 info ( %s %s"%(flowcellType,kit,flowcellType_fast5,kit_fast5) )
#to try to evenout execution time for all jobs do a shuffle of the input
shuffle( allFast5 )
#create the scratch directory tree ( if previous run is found remove them)
# ./scratch/out/??/ for outputs
# ./scratch/in/??/ for inputs
for i in range(chunks) :
rlabel = "%02d"%i
rout = "./scratch/out/%s"%(rlabel)
if ( os.path.exists(rout) ) : shutil.rmtree(rout)
os.makedirs(rout, mode = mask)
rin = "./scratch/in/%s"%(rlabel)
if ( os.path.exists(rin ) ) : shutil.rmtree(rin)
os.makedirs(rin, mode = mask)
#create symlinks to the input fast5
ichunk = 0
cycles = 0
for ifile in allFast5 :
ifileName = ifile.split('/')[-1]
ifileName = ifileName.replace('.fast5.tmp','.fast5') #treat tmp as fast5
indir = "./scratch/in/%02d/"%(ichunk)
src = ifile
dst = indir + ifileName
try :
os.symlink(src, dst)
except :
continue
ichunk += 1
if (ichunk == chunks ) :
cycles += 1
ichunk = 0
#prepare the job array for the cluster
outfile = open('job.cmd','w')
outfile.write(
"""#!/bin/bash
# @ output = out.%a.log
# @ error = err.%a.log
# @ total_tasks = 1
# @ cpus_per_task = 8
# @ wall_clock_limit = 18:59:00
""")
outfile.write("# @ initialdir = .\n")
outfile.write("# @ job_name = %s\n"%(flowcell))
outfile.write("# @ array = 0-%s\n\n"%(chunks-1))
#load the required libraries
outfile.write("module purge\n")
outfile.write("module load gcc/4.9.3-gold\n")
outfile.write("module add hdf5/1.8.12\n")
outfile.write("export LD_LIBRARY_PATH=/home/sbsuser/guppy-2.2.2/lib/:$LD_LIBRARY_PATH\n")
outfile.write("set -e\n")
for ichunk in range(0,chunks) :
indir = "./scratch/in/%02d"%(ichunk)
outdir = "./scratch/out/%02d"%(ichunk)
cmd = "if [ $SLURM_ARRAY_TASK_ID -eq %s ]; then\n"%(ichunk )
cmd +=" time %s/guppy_basecaller"%(guppyBin)
cmd += " --input %s --save_path %s --runners 8 -t 1 --disable_pings -r"%(indir,outdir)
if ( config == None ) :
cmd += " --flowcell %s --kit %s"%(flowcellType,kit)
else :
cmd += ' --config %s'%(config)
if ( D2 == True ) : #do 1D2 chemistry
cmd += ' --fast5_out\n'
cmd += " time %s/guppy_basecaller_1d2"%(guppyBin)
#there is a bug in v2.2.2 of guppy, we need to specify the config file
cmd += " --config /home/sbsuser/guppy-2.2.2/data/dna_r9.5_450bps_1d2_raw.cfg"
cmd += " -r --input %s --save_path %s --index_file %s -t 1 --disable_pings\n"%(outdir,outdir+'/1dsq_analysis/',outdir+'/sequencing_summary.txt')
else :
cmd += "\n"
cmd += "fi\n\n"
outfile.write(cmd)
outfile.close()
launch = '/opt/perf/bin/mnsubmit job.cmd'
p = Popen(launch, shell=True, stdout=PIPE, stderr=PIPE)
p.wait()
out = p.stdout.read().decode()
err = p.stderr.read().decode()
if ('ERROR' in err) :
exit("!! Error submitting job array to cluster (%s)"%(err))
print(out)