-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathparse_aln.py
executable file
·137 lines (119 loc) · 3.85 KB
/
parse_aln.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
'''
parse .aln file - output from cap3 program. Output is fasta file and
profile file
'''
import argparse
import re
def parse_args():
'''Argument parsin'''
description = """
parsing cap3 assembly aln output
"""
parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('-a',
'--aln_file',
default=None,
required=True,
help="Aln file input",
type=str,
action='store')
parser.add_argument('-f',
'--fasta',
default=None,
required=True,
help="fasta output file name",
type=str,
action='store')
parser.add_argument('-p',
'--profile',
default=None,
required=True,
help="output file for coverage profile",
type=str,
action="store")
return parser.parse_args()
def get_header(f):
aln_header = " . : . : . : . : . : . :"
contig_lead = "******************"
aln_start = -1
while True:
line = f.readline()
if not line:
return None, None
if line[0:18] == contig_lead:
line2 = f.readline()
else:
continue
if aln_header in line2:
aln_start = line2.index(aln_header)
break
contig_name = line.split()[1] + line.split()[2]
return contig_name, aln_start
def segment_start(f):
pos = f.tell()
line = f.readline()
# detect next contig or end of file
if "********" in line or line == "" or "Number of segment pairs = " in line:
segment = False
else:
segment = True
f.seek(pos)
return segment
def get_segment(f, seq_start):
if not segment_start(f):
return None, None
aln = []
while True:
line = f.readline()
if ". : . :" in line:
continue
if "__________" in line:
consensus = f.readline().rstrip('\n')[seq_start:]
f.readline() # empty line
break
else:
aln.append(line.rstrip('\n')[seq_start:])
return aln, consensus
def aln2coverage(aln):
coverage = [0] * len(aln[0])
for a in aln:
for i, c in enumerate(a):
if c not in " -":
coverage[i] += 1
return coverage
def read_contig(f, seq_start):
contig = ""
coverage = []
while True:
aln, consensus = get_segment(f, seq_start)
if aln:
contig += consensus
coverage += aln2coverage(aln)
else:
break
return contig, coverage
def remove_gaps(consensus, coverage):
if "-" not in consensus:
return consensus, coverage
new_coverage = [cov for cons, cov in zip(consensus, coverage)
if cons != "-"]
new_consensus = consensus.replace("-", "")
return new_consensus, new_coverage
def main():
args = parse_args()
with open(args.aln_file, 'r') as f1, open(args.fasta, 'w') as ffasta, open(args.profile, 'w') as fprofile:
while True:
contig_name, seq_start = get_header(f1)
if contig_name:
consensus, coverage = remove_gaps(*read_contig(f1, seq_start))
ffasta.write(">{}\n".format(contig_name))
ffasta.write("{}\n".format(consensus))
fprofile.write(">{}\n".format(contig_name))
fprofile.write("{}\n".format(" ".join([str(i) for i in coverage])))
else:
break
if __name__ == "__main__":
main()