-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCADECTv1.0.2.py
133 lines (110 loc) · 4.94 KB
/
CADECTv1.0.2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import argparse
import os
import time
import gzip
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio import pairwise2
def parse_args():
parser = argparse.ArgumentParser(description="Parse FASTA/FASTQ files and analyze sliding windows.")
parser.add_argument("-R", "--reads", required=True, help="Input FASTA/FASTQ file")
parser.add_argument("-w", "--window_size", type=int, default=500, help="Window size (default: 500)")
parser.add_argument("-o", "--output_dir", required=True, help="Output directory")
return parser.parse_args()
def parse_reads(input_file):
"""
Parse input FASTA/FASTQ file and return a list of SeqRecord objects, supporting gzip compression.
"""
records = []
open_func = gzip.open if input_file.endswith('.gz') else open
file_format = determine_format(input_file)
with open_func(input_file, "rt") as handle: # Ensure text mode is used for compatibility
for record in SeqIO.parse(handle, file_format):
records.append(record)
return records
def determine_format(file):
"""
Determine the file format based on the file extension, supporting gzip extensions.
"""
base, ext = os.path.splitext(file)
if ext == ".gz": # Check if it's gzipped
_, ext = os.path.splitext(base) # Get the extension before .gz
if ext.lower() in [".fastq", ".fq"]:
return "fastq"
else:
return "fasta"
def classify_sequences(records, window_size, log_file, table_file):
"""
Classify sequences as short, putative concatemers, or non-concatemers based on the number of windows.
"""
short_sequences = []
concatemers = []
non_concatemers = []
num_parsed = 0
total_reads = len(records)
for record in records:
num_parsed += 1
windows = split_sequence(record, window_size)
num_windows = len(windows)
num_overlaps = get_overlap_count(windows, window_size)
if num_windows == 1:
classification = "short"
short_sequences.append(record)
elif num_overlaps > 0:
classification = "putative_concatemers"
concatemers.append(record)
else:
classification = "non_concatemers"
non_concatemers.append(record)
# Write classification to output file
table_file.write(f"{record.id}\t{classification}\t{num_windows}\t{num_overlaps}\n")
# Write progress to log file
log_file.write(f"Classifying: {num_parsed}/{total_reads}\n")
log_file.flush() # Ensure the message is written immediately
return short_sequences, concatemers, non_concatemers
def split_sequence(record, window_size):
"""
Split a sequence record into sliding windows of a given size.
"""
windows = []
for i in range(0, len(record.seq), window_size):
window = record.seq[i:i+window_size]
windows.append(window)
return windows
def get_overlap_count(windows, window_size):
"""
Get the number of overlaps between sliding windows.
"""
overlap_count = 0
for i in range(len(windows)):
for j in range(i+1, len(windows)):
alignments = pairwise2.align.globalxx(windows[i], windows[j], score_only=True)
align_length = max(len(windows[i]), len(windows[j]))
if alignments / align_length >= 0.7:
overlap_count += 1
return overlap_count
def main():
args = parse_args()
output_dir = args.output_dir
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Parse input file
records = parse_reads(args.reads)
# Open log file for writing progress
with open(os.path.join(output_dir, "progress.log"), "w") as log_file, open(os.path.join(output_dir, "classification_table.txt"), "w") as table_file:
start_time = time.time()
log_file.write("Start time: {}\n".format(time.strftime("%Y-%m-%d %H:%M:%S")))
# Write table header
table_file.write("Read ID\tClassification\tNum Windows\tNum Overlaps\n")
# Classify sequences
short_seqs, concatemers, non_concatemers = classify_sequences(records, args.window_size, log_file, table_file)
# Write sequences to output files based on original format
for seq_list, classification in [(short_seqs, "short"), (concatemers, "putative_concatemers"), (non_concatemers, "non_concatemers")]:
file_format = determine_format(args.reads).replace("fastq", "fq") # Simplify extension to .fq for uniformity
output_file = os.path.join(output_dir, f"{classification}.{file_format}")
SeqIO.write(seq_list, output_file, determine_format(args.reads))
end_time = time.time()
log_file.write("End time: {}\n".format(time.strftime("%Y-%m-%d %H:%M:%S")))
log_file.write("Total time: {:.2f} seconds\n".format(end_time - start_time))
if __name__ == "__main__":
main()