forked from MustafaKpn/DNAcodeX
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDNAcodeX_encoder.py
357 lines (270 loc) · 14.2 KB
/
DNAcodeX_encoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
import argparse
import datetime
import os
######################################### General Functions #########################################
def utf8_bin(u):
# format as 8-digit binary
return ''.join([f'{i:08b}' for i in u.encode('utf-8')])
def map_to_dna(binary_string):
table = binary_string.maketrans('10', 'GC') # Create a translation table to convert '10' to 'GC'
binary_string = binary_string.translate(table) # Apply the translation table to convert the binary representation to the encoded marker
binary_string = list(binary_string)
for i in range(1, len(binary_string), 2):
if binary_string[i] == 'C':
binary_string[i] = 'T' # Replace 'C' with 'T'
if binary_string[i] == 'G':
binary_string[i] = 'A' # Replace 'G' with 'A'
binary_string = ''.join(binary_string)
return binary_string
#################################### Huffman Encoding Functions ####################################
class node:
def __init__(self, symbol, frequency, left=None, right=None):
# symbol
self.symbol = symbol
# frequency
self.frequency = frequency
# Left node
self.left = left
# Right node
self.right = right
def build_frequency_table(data):
"""
Calculates the frequency of symbols in a given string.
Args:
data (str): The input string.
Returns:
dict: A dictionary representing the frequency of each symbol.
"""
frequency_table = dict()
for element in data:
if frequency_table.get(element) is None:
frequency_table[element] = 1 # Initialize the frequency of the symbol.
else:
frequency_table[element] += 1 # Increment the frequency of the symbol.
return frequency_table
def build_huffman_tree(frequency_table):
"""
Builds a Huffman tree based on the frequency table.
Args:
frequency_table (dict): A dictionary representing the frequency of each symbol.
Returns:
node: The root node of the Huffman tree.
"""
nodes = []
for symbol, freq in frequency_table.items():
nodes.append(node(symbol, freq)) # Create a leaf node for each symbol with its frequency.
while len(nodes) > 1:
nodes = sorted(nodes, key=lambda node: node.frequency) # Sort the nodes based on their frequency.
left_node = nodes.pop(0) # Get the node with the lowest frequency.
right_node = nodes.pop(0) # Get the next node with the lowest frequency.
parent_node = node(None, left_node.frequency + right_node.frequency, left_node, right_node) # Create a parent node with the combined frequency.
nodes.append(parent_node) # Add the parent node back to the list of nodes.
return nodes[0] # Return the root node of the Huffman tree.
def build_huffman_codes(node, code='', huffman_codes=[]):
"""
Builds Huffman codes for each symbol in the Huffman tree.
Args:
node (node): The current node in the Huffman tree.
code (str): The Huffman code generated so far (default: '').
huffman_codes (list): A list to store the Huffman codes (default: []).
Returns:
None
"""
if node is None:
return
if node.symbol is not None:
huffman_codes[node.symbol] = code # Assign the Huffman code to the symbol.
build_huffman_codes(node.left, code + '0', huffman_codes) # Traverse the left child with 'C' appended to the code.
build_huffman_codes(node.right, code + '1', huffman_codes) # Traverse the right child with 'G' appended to the code.
def huffman_encode(data):
"""
Encodes the input data using Huffman coding.
Args:
data (str): The input string to be encoded.
Returns:
tuple: A tuple containing the encoded payload and the Huffman codes.
"""
frequency_table = build_frequency_table(data) # Calculate the frequency table.
huffman_tree = build_huffman_tree(frequency_table) # Build the Huffman tree.
huffman_codes = dict()
build_huffman_codes(huffman_tree, '', huffman_codes) # Build Huffman codes for each symbol.
encoded_payload = ''.join([huffman_codes[symbol] for symbol in data]) # Encode the input data using the Huffman codes.
return encoded_payload, huffman_codes
def encode_huffman_instructions(huffman_codes):
"""
Encodes the given Huffman codes using a specific encoding scheme.
Arguments:
- huffman_codes: A dictionary containing the Huffman codes.
Returns:
- The encoded instructions as a string.
"""
codes = ''
for key, value in huffman_codes.items():
codes += ',' + key + value # Concatenate the key and value to form the encoded representation of the Huffman codes
binary_string = utf8_bin(codes)
return binary_string
def encode_marker(instructions_string):
"""
Encodes the instructions length using a specific encoding scheme.
Arguments:
- instructions_string: The instructions string.
Returns:
- The encoded marker and the length of the instructions as a tuple.
"""
instructions_len = str(len(instructions_string)) # Get the length of the instructions as a string
binary_string = ''
bin_list = [bin(ord(chr)) for chr in instructions_len] # Convert each character of the instructions length to its ASCII value and then to a binary string
for binary in bin_list:
binary = binary.replace('0b', '')
binary = binary.zfill(8)
binary_string = binary_string + binary # Concatenate the binary strings to form the complete binary representation
return binary_string, instructions_len
def gc_counter(string):
gc = round((string.count('G') + string.count('C'))/len(string)*100, 3)
return gc
def read_chrs(file_name):
"""
Encodes the given data using Huffman coding, marker encoding, and marker length encoding.
Arguments:
- data: The input data to encode.
Returns:
- The encoded string representing the data.
"""
with open(file_name, 'r', encoding='utf-8', newline='\r\n') as f:
read = f.read()
data = ''
for chr in read:
data += chr
return data
#################################### Hamming Error Correction Functions ####################################
def bit_switch(bit):
if bit == 1:
return 0
elif bit == 0:
return 1
def add_hamming(string):
string_list = []
for i in string:
string_list.append(int(i))
parity_list = []
if len(string_list) == 4:
x1 = string_list[0] ^ string_list[1] ^ string_list[3]
parity_list.append(str(x1))
x2 = string_list[0] ^ string_list[2] ^ string_list[3]
parity_list.append(str(x2))
x3 = string_list[1] ^ string_list[2] ^ string_list[3]
parity_list.append(str(x3))
elif len(string_list) == 3:
x1 = string_list[0] ^ string_list[1]
parity_list.append(str(x1))
x2 = string_list[1] ^ string_list[2]
parity_list.append(str(x2))
x3 = string_list[0] ^ string_list[2]
parity_list.append(str(x3))
elif len(string_list) == 2:
x1 = bit_switch(string_list[0])
parity_list.append(str(x1))
x2 = bit_switch(string_list[1])
parity_list.append(str(x2))
x3 = string_list[0] ^ string_list[1]
parity_list.append(str(x3))
elif len(string_list) == 1:
x1 = string_list[0]
x2 = string_list[0]
parity_list.append(str(x1))
parity_list.append(str(x2))
binary_string = string + ''.join(parity_list)
return binary_string
def add_hamming_to_string(string):
codewords = ''
parity_count = 0
for i in range(0, len(string), 4):
bits_string = string[i:i+4]
codewords = codewords + add_hamming(bits_string)
if len(bits_string) == 4 or len(bits_string) == 3 or len(bits_string) == 2:
parity_count += 3
elif len(bits_string) == 1:
parity_count += 2
return codewords, parity_count
def file_to_binary(file_data):
bytes_list = []
for byte in file_data:
binary = str(bin(byte)).replace('0b', '').zfill(8)
bytes_list.append(binary)
binary_string = ''.join(bytes_list)
return binary_string
############################################################################################################
parser = argparse.ArgumentParser(description='Huffman DNA encoding system.')
parser.add_argument('-f', '--file_name', required=True, type=str, metavar='', help='The file name you want to encode.')
parser.add_argument('-huffman', '--Huffman', required=False, action='store_true', help='To be called if you want the encoded file to be compressed using Huffman variable length codes.')
parser.add_argument('-t', '--type', required=True, choices=['jpg', 'jpeg', 'png', 'txt', 'gz', 'txt.gz'], metavar='', help='The format of the file you are encoding.')
parser.add_argument('-o', '--output_filename', required=False, type=str, default='encoded_data.txt', metavar='', help='The name of the output file you want to save the encoded data in.')
args = parser.parse_args()
if __name__ == '__main__':
input_file_size = os.path.getsize('./{}'.format(args.file_name))
current_time = datetime.datetime.now()
formatted_time = current_time.strftime("%Y%m%d%H%M%S")
print("\n\033[1;34m############################# Encoding Info #############################\033[0m")
print("\033[1;35m# File Name:\033[0m \033[93m{}\033[0m".format(args.file_name))
print("\033[1;35m# File Format:\033[0m \033[93m{}\033[0m".format(args.type))
print("\033[1;35m# File Size:\033[0m \033[93m{} bytes\033[0m".format(input_file_size))
print("\033[1;35m# Huffman:\033[0m \033[93m{}\033[0m".format(args.Huffman))
print("\033[1;35m# Error Correction Method:\033[0m \033[93mHamming\033[0m")
if args.Huffman == True:
if args.type == 'txt':
data = read_chrs(args.file_name)
suffix = '_text.txt'
data_bits = len(data) * 8
elif args.type == 'jpeg' or args.type == 'jpg' or args.type == 'png' or args.type == 'gz' or args.type == 'txt.gz':
with open(args.file_name, 'rb') as f:
read = f.read()
data = ''
for byte in read:
data += str(byte).zfill(3)
suffix = '_{}.txt'.format(args.type)
encoded_payload, huffman_codes = huffman_encode(data) # Perform Huffman encoding on the data to obtain encoded data and Huffman codes
encoded_instructions = encode_huffman_instructions(huffman_codes) # Encode the Huffman codes to obtain the encoded instructions
encoded_marker, len_instructions_len = encode_marker(encoded_instructions) # Encode the length of the instructions to obtain the marker and length of instructions
marker_len = encode_marker(len_instructions_len) # Encode the length of the instructions length to obtain the marker length
binary_data = marker_len[0] + encoded_marker + encoded_instructions + encoded_payload # Concatenate the marker length, marker, instructions, and data to form the final encoded string
encoded_payload_bits = len(encoded_payload)
compression_ratio = round((encoded_payload_bits)/(input_file_size * 8) * 100, 3)
decoding_info_ratio = round((len(marker_len) + len(encoded_marker) + len(encoded_instructions))/len(binary_data)*100, 3)
print("\n\033[1;32m> Huffman compression was applied\033[0m")
print("> Space usage BEFORE Huffman compression: \033[1;31m{} bits\033[0m".format(input_file_size * 8))
print("> Space usage AFTER Huffman compression (payload): \033[1;32m{} bits\033[0m".format(encoded_payload_bits))
print("> Space usage AFTER Huffman compression (payload + header): \033[1;32m{} bits\033[0m".format(len(binary_data)))
print("> Compression ratio (payload): \033[1;32m{} %\033[0m".format(compression_ratio))
print("> Ratio of decoding information to the full encoded data: \033[1;32m{} %\033[0m".format(decoding_info_ratio))
elif args.Huffman == False:
if args.type == 'txt':
with open(args.file_name, 'r', encoding='utf-8', newline='\r\n') as f:
read = f.read()
binary_data = utf8_bin(read)
suffix = '_text.txt'
elif args.type == 'jpeg' or args.type == 'jpg' or args.type == 'png' or args.type == 'gz' or args.type == 'txt.gz':
with open(args.file_name, 'rb') as f:
read = f.read()
binary_data = file_to_binary(read)
suffix = '_{}.txt'.format(args.type)
print("\n\033[1;31m> Huffman compression was NOT applied\033[0m")
compression_ratio = 0
decoding_info_ratio = 0
binary_data_hamming, parity_count = add_hamming_to_string(binary_data)
output_data = map_to_dna(binary_data_hamming)
print("> Hamming correction parity check bits were added to the sequence.")
print('> The number of Hamming parity bits that were added: \033[1;32m{} bits\033[0m'.format(parity_count))
print("> The ratio of parity check bits to the full length of the sequence: \033[1;32m{} %\033[0m".format(round(parity_count/len(output_data) * 100)))
print("> GC-content of the full sequence: \033[1;32m{} %\033[0m".format(gc_counter(output_data)))
print("> Full length of the sequence: \033[1;32m{} DNA bases\033[0m".format(len(output_data)))
output_filename = args.output_filename + suffix
if os.path.exists('./DNAcodeX_encoding_INFO.csv'):
pass
else:
with open('DNAcodeX_encoding_INFO.csv', 'w') as f:
f.write('Input File,ID(DateTime),Huffman,Size Before Compression (bits),Size AFter Compression (bits),Compression Ratio (payload)(%),Hamming Parity Bits Count,Parity Check Ratio (%),Output Sequence Length(DNA bases)\n')
with open('DNAcodeX_encoding_INFO.csv', 'a') as f:
f.write(args.file_name + ',' + formatted_time + ',' + str(args.Huffman) + ',' + str(input_file_size * 8) + ',' + str(len(binary_data)) + ',' + str(compression_ratio) + ',' + str(parity_count) + ',' + str(round(parity_count/len(output_data) * 100)) + ',' + str(len(output_data)) + '\n')
with open(output_filename, 'w') as f:
f.write(output_data)
print("> DNA encoded data was saved in the file: \033[1;36m{}\033[0m\n".format(output_filename))