-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilesoup.py
executable file
·339 lines (299 loc) · 12.2 KB
/
filesoup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#!/usr/bin/env python3
"""
filesoup file hasher
A GUI interface to Python's hashlib module.
Supported hashes are listed in hashlib.algorithms_available.
Requires:
Python 3.2+
PyQt5
TODOS:
* Hash multiple files
* Export digests to file
"""
import os
import hashlib
import timeit
from PyQt5.QtCore import (pyqtSignal, pyqtSlot, Qt, QDir, QFileInfo, QObject, # pylint: disable=no-name-in-module
QRectF, QThread)
from PyQt5.QtGui import (QBrush, QColor, QKeySequence, QLinearGradient, # pylint: disable=no-name-in-module
QPalette)
from PyQt5.QtWidgets import (QApplication, QFileDialog, QFormLayout, QLineEdit, # pylint: disable=no-name-in-module
QMainWindow, QMessageBox, QPushButton, QWidget)
__version__ = '0.3'
__author__ = 'Quentin Minster'
ALGORITHMS_AVAILABLE = {'md5', 'sha1', 'sha256', 'sha512'} \
& hashlib.algorithms_available
def read_chunk(file_, chunk_size=1024):
"""Lazy function (generator) to read a file chunk by chunk.
Default chunk size: 1k."""
while True:
data = file_.read(chunk_size)
if not data:
break
yield data
class FileSoupWindow(QMainWindow):
"""The main filesoup window."""
# Time to wait for graceful termination of a worker thread
# If exceeded, the thread will be abruptly terminated
thread_timeout = 1000
# Span of the gradient at the edge of the progress bar
# Keep this below 0.1
gradient_span = 0.001
# Progress bar color
gradient_color = '#f99e41'
# Hash match colors
nomatch_color = QColor(255,0,0,42) # Qt.red + alpha
match_color = QColor(0,255,0,42) # Qt.green + alpha
def __init__(self, file_=None, parent=None):
super(FileSoupWindow, self).__init__(parent)
self.filebutton = None
self.fileedit = None
self.fileeditbase = None
self.edits = {}
self.worker = None
self.thread = None
self.setupUi()
# Process the provided file, if any
if file_ is not None:
self.selectFile(QFileInfo(file_).canonicalFilePath())
def closeEvent(self, event):
"""Handle window close requests."""
# pylint: disable=invalid-name
self.stopThread()
event.accept()
def dragEnterEvent(self, event):
"""Handle file drag enter events."""
#pylint: disable=invalid-name,no-self-use
if event.mimeData().hasUrls() and event.mimeData().urls()[0].isLocalFile():
event.accept()
else:
event.ignore()
def dropEvent(self, event):
"""Handle file drop events."""
#pylint: disable=invalid-name,no-self-use
event.accept()
self.selectFile(event.mimeData().urls()[0].toLocalFile())
@pyqtSlot(str)
def error(self, message):
"""Display error messages from the worker."""
QMessageBox.critical(self, 'filesoup', message)
def keyReleaseEvent(self, event):
"""
Handle key release events:
* QKeySequence.Paste: check clipboard text against computed digests
"""
#pylint: disable=invalid-name
if event.matches(QKeySequence.Paste):
event.accept()
text = QApplication.clipboard().text().lower().strip()
if len(text) > 0:
matched = []
empty = True
for alg in sorted(ALGORITHMS_AVAILABLE):
edit = self.edits[alg]
color = QColor()
if text == edit.text():
matched.append(alg)
color = self.match_color
elif len(text) == len(edit.text()):
color = self.nomatch_color
if color.isValid():
palette = QPalette()
palette.setColor(QPalette.Base, color)
edit.setPalette(palette)
empty &= (len(edit.text()) == 0)
if empty is False:
if len(matched) == 0:
QMessageBox.information(self, 'filesoup',
'No match for provided hash.')
elif len(matched) == 1:
QMessageBox.information(self, 'filesoup',
'Hash match: '
+ matched[0].upper())
else:
QMessageBox.critical(self, 'filesoup',
'Multiple hash match: %s'
% ", ".join(matched))
else:
event.ignore()
@pyqtSlot()
def selectFile(self, path=None):
"""Select a file and start a worker thread to compute its digests."""
# pylint: disable=invalid-name
# Interrupt any currently running thread
self.stopThread()
# Get file to process
if path is None:
(path, _) = QFileDialog.getOpenFileName(self) # getOpenFileName() returns a tuple
if path == '':
return
self.fileedit.setText(QDir.toNativeSeparators(path))
for edit in self.edits.values():
edit.setText('')
edit.setPalette(QApplication.palette(edit))
# Create worker and run it in a separate thread
# A note on signals:
# * the worker receives its signals in the new thread's event loop
# * the thread receives its signals in the *main* thread's event loop
thread = QThread()
worker = FileDigestWorker(ALGORITHMS_AVAILABLE, path)
worker.progress.connect(self.setProgress)
worker.digested.connect(self.setDigest)
worker.error.connect(self.error)
worker.moveToThread(thread)
thread.started.connect(worker.process)
worker.finished.connect(thread.quit)
thread.finished.connect(self.stopThread)
thread.start(QThread.HighPriority)
self.worker = worker
self.thread = thread
@pyqtSlot(str, str)
def setDigest(self, algorithm, digest):
"""Display one of the file's digests."""
# pylint: disable=invalid-name
edit = self.edits[algorithm]
edit.setText(digest)
# Adjust the width so that the digest fits
# (+10 for Windows where the bounding rect width isn't quite enough)
width = edit.fontMetrics().boundingRect(digest).width()
edit.setMinimumWidth(width + 10)
@pyqtSlot(float)
def setProgress(self, progress):
"""Update the file digest computation progress bar."""
# pylint: disable=invalid-name
rect = QRectF(self.fileedit.rect())
gradient = QLinearGradient(rect.topLeft(), rect.topRight())
if self.gradient_span < progress:
stop = progress - self.gradient_span
else:
stop = 0
gradient.setColorAt(stop, QColor(self.gradient_color))
gradient.setColorAt(progress, self.fileeditbase)
if progress < 1 - self.gradient_span:
stop = progress + self.gradient_span
else:
stop = 1
gradient.setColorAt(stop, self.fileeditbase)
palette = self.fileedit.palette()
palette.setBrush(QPalette.Base, QBrush(gradient))
self.fileedit.setPalette(palette)
def setupUi(self):
"""Setup the GUI."""
# pylint: disable=invalid-name
# Window layout
widget = QWidget(self)
self.setCentralWidget(widget)
layout = QFormLayout()
layout.setLabelAlignment(Qt.AlignRight)
widget.setLayout(layout)
# File row
self.filebutton = QPushButton('File', widget)
self.filebutton.clicked.connect(self.selectFile)
self.fileedit = QLineEdit(widget)
self.fileedit.setReadOnly(True)
self.fileeditbase = self.fileedit.palette().base().color()
layout.addRow(self.filebutton, self.fileedit)
# Digest rows
for alg in sorted(ALGORITHMS_AVAILABLE):
edit = QLineEdit(widget)
edit.setReadOnly(True)
layout.addRow(' ' + alg.upper() + ' ', edit)
self.edits[alg] = edit
# Let setDigest() adjust the width of each row
digest = hashlib.new(alg)
self.setDigest(alg, '0' * len(digest.hexdigest()))
edit.setText('')
self.setAcceptDrops(True)
self.setWindowTitle('filesoup')
self.show()
@pyqtSlot()
def stopThread(self):
"""Stop the worker thread, if any."""
# pylint: disable=invalid-name
if self.thread is not None:
if not self.thread.isFinished():
# Thread still running: tell the worker to stop gracefully
self.thread.requestInterruption()
# Explicitly stop the thread's event loop now, since it won't
# receive the worker's finished() signal as we want to wait()
# for it without returning to our own event loop
self.thread.quit()
# Grace period for the thread to properly finish
if not self.thread.wait(self.thread_timeout):
self.thread.terminate()
self.fileedit.setText('')
# Always reset the progress bar
self.setProgress(0)
# Forget the worker and thread
self.worker = None
self.thread = None
class FileDigestWorker(QObject):
"""Worker class for computing the digests of a file."""
# pylint: disable=too-few-public-methods
# Chunk size (in bytes) to read between digests updates
# Large chunks (>> digest.block_size) are more efficient
chunk_size = 2*1024*1024 # 2MB
# Interval (in bytes of file processed) between progress notifications
# For smoother progression, should be a multiple of the chunk size
progress_interval = 6*1024*1024 # 6MB
def __init__(self, algorithms, path, parent=None):
super(FileDigestWorker, self).__init__(parent)
self.algorithms = algorithms
self.path = path
progress = pyqtSignal(float)
digested = pyqtSignal(str, str)
error = pyqtSignal(str)
finished = pyqtSignal()
@pyqtSlot()
def process(self):
"""Compute the file's digests."""
try:
self.progress.emit(0)
digests = {hashlib.new(a) for a in self.algorithms}
size = 0
interval_size = 0
with open(self.path, 'rb') as file_:
stat = os.stat(self.path)
start_time = timeit.default_timer()
for chunk in read_chunk(file_, self.chunk_size):
# Update digests
for digest in digests:
digest.update(chunk)
# Notify progress
interval_size += len(chunk)
if self.progress_interval <= interval_size:
self.progress.emit(size / stat.st_size)
size += interval_size
interval_size = 0
# Check for interruption request
if QThread.currentThread().isInterruptionRequested():
self.finished.emit()
return
elapsed = timeit.default_timer() - start_time
size_mb = stat.st_size / (1024 * 1024)
print('%s: %.2f MB in %.2f s (%.2f MB/s)'
% (QDir.toNativeSeparators(self.path),
size_mb,
elapsed,
size_mb / elapsed))
# Display digests
self.progress.emit(1)
for digest in digests:
self.digested.emit(digest.name, digest.hexdigest())
self.finished.emit()
except Exception as ex:
self.error.emit(str(ex))
def main():
"""Start the Qt application and GUI."""
import sys
app = QApplication(sys.argv)
# Spawn a window per file
wins = []
for arg in QApplication.arguments()[1:]:
wins.append(FileSoupWindow(arg))
ret = 0
if len(wins):
ret = app.exec_()
sys.exit(ret)
if __name__ == '__main__':
main()