diff --git a/README.md b/README.md index a3b6334..786ca38 100644 --- a/README.md +++ b/README.md @@ -50,8 +50,8 @@ Lib Usage ```python archive = destream.open("some_file.tar.gz") - assert isinstance(archive, destream.ArchivePack) \ - and isinstance(archive.tarfile, tarfile.TarFile) + assert (isinstance(archive, destream.ArchivePack) and + isinstance(archive.tarfile, tarfile.TarFile)) # ==> we can extract members using extract() and extractall() archive.extractall("/tmp") diff --git a/destream/archive.py b/destream/archive.py index 2dd5956..ae95f0a 100644 --- a/destream/archive.py +++ b/destream/archive.py @@ -8,16 +8,18 @@ """.split() -RE_EXTENSION = re.compile(r'^(.*?)(\.([^.]+))?$') +RE_EXTENSION = re.compile(r"^(.*?)(\.([^.]+))?$") class Archive(io.BufferedReader): """ Base class to Archive file """ + def __init__(self, name, fileobj=None, source=None, closefd=True): - assert type(self) != Archive, \ - "This class can not be used in standalone" + assert ( + type(self) != Archive + ), "This class can not be used in standalone" if not fileobj: fileobj = io.BytesIO() elif sys.version_info < (3, 0) and isinstance(fileobj, file): @@ -32,7 +34,7 @@ def __init__(self, name, fileobj=None, source=None, closefd=True): f"got {type(fileobj)}" ) io.BufferedReader.__init__(self, fileobj) - self.realname = name or '' + self.realname = name or "" self.source = source self.closefd = closefd if isinstance(source, Archive): @@ -41,36 +43,38 @@ def __init__(self, name, fileobj=None, source=None, closefd=True): else: self._decompressors = [type(self)] self.compressions = [] - if hasattr(self, '_compression'): + if hasattr(self, "_compression"): self.compressions += [self._compression] @classmethod - def _check_availability(self): + def _check_availability(cls): pass @classmethod def _guess(cls, mime, name, fileobj): - if getattr(cls, '_unique_instance', False): + if getattr(cls, "_unique_instance", False): if cls in fileobj._decompressors: raise ValueError( f"class {cls} already in the decompressor list" ) realname = name - if hasattr(cls, '_mimes'): + if hasattr(cls, "_mimes"): match = RE_EXTENSION.search(name) - if hasattr(cls, '_extensions') and \ - match.group(2) and \ - os.path.normcase(match.group(3)) in cls._extensions: + if ( + hasattr(cls, "_extensions") + and match.group(2) + and os.path.normcase(match.group(3)) in cls._extensions + ): realname = match.group(1) if mime not in cls._mimes: raise ValueError( (cls, mime, name, fileobj), - f"can not decompress fileobj using class {cls.__name__}" + f"can not decompress fileobj using class {cls.__name__}", ) return realname def close(self): - if getattr(self, 'closefd', True): + if getattr(self, "closefd", True): super().close() @@ -78,6 +82,7 @@ class ArchivePack(Archive): """ Base class for an archive that is also a pack of file (tar, zip, ...) """ + def __init__(self, name, fileobj=None, source=None): Archive.__init__(self, name, fileobj, source=source) @@ -86,16 +91,20 @@ def single(self): def members(self): raise NotImplementedError( - f"class {type(self)} does not implement this method") + f"class {type(self)} does not implement this method" + ) def open(self, member): raise NotImplementedError( - f"class {type(self)} does not implement this method") + f"class {type(self)} does not implement this method" + ) def extract(self, member, path): raise NotImplementedError( - f"class {type(self)} does not implement this method") + f"class {type(self)} does not implement this method" + ) def extractall(self, path, members=None): raise NotImplementedError( - f"class {type(self)} does not implement this method") + f"class {type(self)} does not implement this method" + ) diff --git a/destream/decompressors/__init__.py b/destream/decompressors/__init__.py index ca6e27b..0ea99ff 100644 --- a/destream/decompressors/__init__.py +++ b/destream/decompressors/__init__.py @@ -15,7 +15,8 @@ builtin_decompressors = [ (0, symbol) - for name, symbol - in inspect.getmembers(sys.modules[__name__], inspect.isclass) + for name, symbol in inspect.getmembers( + sys.modules[__name__], inspect.isclass + ) if issubclass(symbol, archive.Archive) ] diff --git a/destream/decompressors/bzip2.py b/destream/decompressors/bzip2.py index 452f7fe..27e9f90 100644 --- a/destream/decompressors/bzip2.py +++ b/destream/decompressors/bzip2.py @@ -1,16 +1,16 @@ from destream import ExternalPipe -__all__ = ['Bunzip2'] +__all__ = ["Bunzip2"] class Bunzip2(ExternalPipe): - _mimes = ['application/x-bzip2'] - _extensions = ['bz2', 'bz', 'tbz2', 'tbz'] - _command = ['bunzip2'] - _compression = 'bzip2' + _mimes = ["application/x-bzip2"] + _extensions = ["bz2", "bz", "tbz2", "tbz"] + _command = ["bunzip2"] + _compression = "bzip2" @classmethod def _guess(cls, mime, name, fileobj): - is_tar = name.endswith('.tbz2') or name.endswith('.tbz') + is_tar = name.endswith(".tbz2") or name.endswith(".tbz") realname = super()._guess(mime, name, fileobj) - return realname + '.tar' if is_tar else realname + return realname + ".tar" if is_tar else realname diff --git a/destream/decompressors/gzip.py b/destream/decompressors/gzip.py index 107380d..cf3f4e8 100644 --- a/destream/decompressors/gzip.py +++ b/destream/decompressors/gzip.py @@ -1,34 +1,34 @@ from destream import ExternalPipe -__all__ = ['Gunzip'] +__all__ = ["Gunzip"] class Gunzip(ExternalPipe): _mimes = [ - 'application/x-gzip', - 'application/gzip', + "application/x-gzip", + "application/gzip", ] - _extensions = ['gz'] - _command = ['gunzip'] - _compression = 'gzip' + _extensions = ["gz"] + _command = ["gunzip"] + _compression = "gzip" @classmethod def _guess(cls, mime, name, fileobj): if mime not in cls._mimes: raise ValueError("not a gzip compression") lowered = name.lower() - if lowered.endswith('.gz'): + if lowered.endswith(".gz"): realname = name[:-3] - elif lowered.endswith('-gz'): + elif lowered.endswith("-gz"): realname = name[:-3] - elif lowered.endswith('.z'): + elif lowered.endswith(".z"): realname = name[:-2] - elif lowered.endswith('-z'): + elif lowered.endswith("-z"): realname = name[:-2] - elif lowered.endswith('_z'): + elif lowered.endswith("_z"): realname = name[:-2] - elif lowered.endswith('.tgz') or lowered.endswith('.taz'): - realname = name[:-4] + '.tar' + elif lowered.endswith(".tgz") or lowered.endswith(".taz"): + realname = name[:-4] + ".tar" else: realname = name return realname diff --git a/destream/decompressors/lzma.py b/destream/decompressors/lzma.py index a9a1488..a9beb3d 100644 --- a/destream/decompressors/lzma.py +++ b/destream/decompressors/lzma.py @@ -1,10 +1,10 @@ from destream import ExternalPipe -__all__ = ['Unlzma'] +__all__ = ["Unlzma"] class Unlzma(ExternalPipe): - _mimes = ['application/x-lzma'] - _extensions = ['lzma'] - _command = 'unlzma -c'.split() - _compression = 'lzma' + _mimes = ["application/x-lzma"] + _extensions = ["lzma"] + _command = "unlzma -c".split() + _compression = "lzma" diff --git a/destream/decompressors/p7zip.py b/destream/decompressors/p7zip.py index c5f3744..8564bf6 100644 --- a/destream/decompressors/p7zip.py +++ b/destream/decompressors/p7zip.py @@ -6,19 +6,19 @@ from destream import ArchivePack, ArchiveTemp, ExternalPipe -__all__ = ['Un7z'] +__all__ = ["Un7z"] -ereg_header = re.compile('^'+r'--+\n(.+?)(?=\n\n)', re.M+re.S) -ereg_member = re.compile('^'+r'(.+?)(?=\n\n)', re.M+re.S) +ereg_header = re.compile("^" + r"--+\n(.+?)(?=\n\n)", re.M + re.S) +ereg_member = re.compile("^" + r"(.+?)(?=\n\n)", re.M + re.S) def parse_hunk(hunk): info = {} for m in re.finditer( - r'^[ \t\f]*(.+?)[ \t\f]*=[ \t\f]*(.*?)[ \t\f]*$', - hunk, flags=re.M): - key = re.sub(r'\W', '_', m.group(1).lower()) + r"^[ \t\f]*(.+?)[ \t\f]*=[ \t\f]*(.*?)[ \t\f]*$", hunk, flags=re.M + ): + key = re.sub(r"\W", "_", m.group(1).lower()) info[key] = m.group(2) return info @@ -26,36 +26,39 @@ def parse_hunk(hunk): class Header: def __init__(self, hunk): info = parse_hunk(hunk) - info['physical_size'] = int(info['physical_size']) - info['headers_size'] = int(info['headers_size']) - info['blocks'] = int(info['blocks']) + info["physical_size"] = int(info["physical_size"]) + info["headers_size"] = int(info["headers_size"]) + info["blocks"] = int(info["blocks"]) self.__dict__.update(info) class Member: def __init__(self, hunk): info = parse_hunk(hunk) - info['filename'] = info['path'] - info['size'] = int(info['size']) - info['packed_size'] = int(info['packed_size'] or '0') - info['block'] = int(info['block'] or '0') - if info['crc']: - info['crc'] = reduce(lambda x, y: x * 256 + y, \ - struct.unpack('BBBB', binascii.unhexlify(info['crc'])), 0) + info["filename"] = info["path"] + info["size"] = int(info["size"]) + info["packed_size"] = int(info["packed_size"] or "0") + info["block"] = int(info["block"] or "0") + if info["crc"]: + info["crc"] = reduce( + lambda x, y: x * 256 + y, + struct.unpack("BBBB", binascii.unhexlify(info["crc"])), + 0, + ) self.__dict__.update(info) def isfile(self): - return self.attributes[0] != 'D' + return self.attributes[0] != "D" def isdir(self): - return self.attributes[0] == 'D' + return self.attributes[0] == "D" class Un7z(ArchivePack): - _mimes = ['application/x-7z-compressed'] - _extensions = ['7z'] - _command = ['7zr'] - _compression = '7z' + _mimes = ["application/x-7z-compressed"] + _extensions = ["7z"] + _command = ["7zr"] + _compression = "7z" @classmethod def _check_availability(cls): @@ -63,17 +66,21 @@ def _check_availability(cls): def __init__(self, name, fileobj): self.fileobj = ArchiveTemp(fileobj) - info = check_output(self._command + - ['l', self.fileobj.name, '-slt']).decode() + info = check_output( + self._command + ["l", self.fileobj.name, "-slt"] + ).decode() self.header = Header(ereg_header.search(info).group(1)) - self._members = [Member(m.group(1)) \ - for m in ereg_member.finditer(info, - re.search('^'+'-'*10+'$', info, re.M).end(0))] - self._stream = (len(self._members) == 1) + self._members = [ + Member(m.group(1)) + for m in ereg_member.finditer( + info, re.search("^" + "-" * 10 + "$", info, re.M).end(0) + ) + ] + self._stream = len(self._members) == 1 if self._stream: stream = self.open(self._members[0]) stream_name = self._members[0].filename - self._compression += ':' + stream_name + self._compression += ":" + stream_name else: stream_name = name stream = self.fileobj @@ -84,10 +91,17 @@ def members(self): return self._members def open(self, member): - p = Popen(self._command + - ['e', self.fileobj.name, '-so', - (member.filename if isinstance(member, Member) else member)], - stdout=PIPE, stderr=PIPE) + p = Popen( + self._command + + [ + "e", + self.fileobj.name, + "-so", + (member.filename if isinstance(member, Member) else member), + ], + stdout=PIPE, + stderr=PIPE, + ) if self._stream: self._p = p return p.stdout @@ -97,7 +111,8 @@ def open(self, member): retcode = p.wait() if retcode: raise CalledProcessError( - retcode, self._command, output=p.stderr.read()) + retcode, self._command, output=p.stderr.read() + ) finally: p.stdout.close() p.stderr.close() @@ -120,27 +135,40 @@ def close(self): self.fileobj.close() def extract(self, member, path): - p = Popen(self._command + - ['x', self.fileobj.name, '-y', '-o'+path, - (member.filename if isinstance(member, Member) else member)], - stdout=PIPE) + p = Popen( + self._command + + [ + "x", + self.fileobj.name, + "-y", + "-o" + path, + (member.filename if isinstance(member, Member) else member), + ], + stdout=PIPE, + ) try: retcode = p.wait() if retcode: raise CalledProcessError( - retcode, self._command, output=p.stdout.read()) + retcode, self._command, output=p.stdout.read() + ) finally: p.stdout.close() - def extractall(self, path, members=[]): - p = Popen(self._command + - ['x', self.fileobj.name, '-y', '-o'+path] + - [(m.filename if isinstance(m, Member) else m) for m in members], - stdout=PIPE) + def extractall(self, path, members=None): + if members is None: + members = [] + p = Popen( + self._command + + ["x", self.fileobj.name, "-y", "-o" + path] + + [(m.filename if isinstance(m, Member) else m) for m in members], + stdout=PIPE, + ) try: retcode = p.wait() if retcode: raise CalledProcessError( - retcode, self._command, output=p.stdout.read()) + retcode, self._command, output=p.stdout.read() + ) finally: p.stdout.close() diff --git a/destream/decompressors/rar.py b/destream/decompressors/rar.py index 999d315..1231c1a 100644 --- a/destream/decompressors/rar.py +++ b/destream/decompressors/rar.py @@ -7,18 +7,18 @@ from destream import ArchivePack, ArchiveTemp, ExternalPipe -__all__ = ['Unrar'] +__all__ = ["Unrar"] def iter_on_hunks(hunks): for hunk in hunks: info = {} for m in re.finditer( - r'^[ \t\f]*(.+?)[ \t\f]*:[ \t\f]*(.*?)[ \t\f]*$', - hunk, flags=re.M): - key = re.sub(r'\W', '_', m.group(1).lower()) + r"^[ \t\f]*(.+?)[ \t\f]*:[ \t\f]*(.*?)[ \t\f]*$", hunk, flags=re.M + ): + key = re.sub(r"\W", "_", m.group(1).lower()) info[key] = m.group(2) - if info.get('service', '') == 'EOF': + if info.get("service", "") == "EOF": break yield info @@ -26,36 +26,40 @@ def iter_on_hunks(hunks): class Header: def __init__(self, info): self.__dict__.update(info) - assert 'RAR' in self.details, f"Maybe not a RAR file: {self.details}" + assert "RAR" in self.details, f"Maybe not a RAR file: {self.details}" class Member: def __init__(self, info): - info['filename'] = info.pop('name') - info['size'] = int(info.get('size', 0)) - info['packed_size'] = int(info.get('packed_size', 0)) - info['ratio'] = float(info.get('ratio', '0%')[:-1]) / 100 - info['crc32'] = reduce(lambda x, y: x * 256 + y, \ - struct.unpack('BBBB', binascii.unhexlify(info['crc32'])), 0) + info["filename"] = info.pop("name") + info["size"] = int(info.get("size", 0)) + info["packed_size"] = int(info.get("packed_size", 0)) + info["ratio"] = float(info.get("ratio", "0%")[:-1]) / 100 + info["crc32"] = reduce( + lambda x, y: x * 256 + y, + struct.unpack("BBBB", binascii.unhexlify(info["crc32"])), + 0, + ) self.__dict__.update(info) def isfile(self): - return self.type == 'File' + return self.type == "File" def isdir(self): - return self.type == 'Directory' + return self.type == "Directory" + class Unrar(ArchivePack): - _mimes = ['application/x-rar'] - _extensions = ['rar'] - _command = ['rar'] - _compression = 'rar' + _mimes = ["application/x-rar"] + _extensions = ["rar"] + _command = ["rar"] + _compression = "rar" # NOTE: # https://en.wikipedia.org/wiki/Unrar # Unrar is the name of two different programs, we should prefer rar by # default to make sure to use the most recent and compatible version if # available. - __fallbackcommands__ = ['unrar'] + __fallbackcommands__ = ["unrar"] @classmethod def _check_availability(cls): @@ -65,23 +69,25 @@ def _check_availability(cls): assert matches, f"{cls._command[0]}: can not determine version" cls.version = tuple(Version(matches.group(1)).version) # NOTE: the parameter vta is available from version 5 - assert cls.version >= (5, 0), ( - f"{cls._command[0]}: incompatible version {cls.version}" - ) + assert cls.version >= ( + 5, + 0, + ), f"{cls._command[0]}: incompatible version {cls.version}" def __init__(self, name, fileobj): self.fileobj = ArchiveTemp(fileobj) - output = check_output(self._command + - ['vta', self.fileobj.name]).decode() + output = check_output( + self._command + ["vta", self.fileobj.name] + ).decode() hunks = iter_on_hunks(output.split("\n\n")) self.information = next(hunks) self.header = Header(next(hunks)) self._members = [m for m in (Member(h) for h in hunks)] - self._stream = (len(self._members) == 1) + self._stream = len(self._members) == 1 if self._stream: stream = self.open(self._members[0]) stream_name = self._members[0].filename - self._compression += ':' + stream_name + self._compression += ":" + stream_name else: stream_name = name stream = self.fileobj @@ -92,10 +98,17 @@ def members(self): return self._members def open(self, member): - p = Popen(self._command + - ['p', '-ierr', self.fileobj.name, - (member.filename if isinstance(member, Member) else member)], - stdout=PIPE, stderr=PIPE) + p = Popen( + self._command + + [ + "p", + "-ierr", + self.fileobj.name, + (member.filename if isinstance(member, Member) else member), + ], + stdout=PIPE, + stderr=PIPE, + ) if self._stream: self._p = p return p.stdout @@ -105,7 +118,8 @@ def open(self, member): retcode = p.wait() if retcode: raise CalledProcessError( - retcode, self._command, output=p.stderr.read()) + retcode, self._command, output=p.stderr.read() + ) finally: p.stdout.close() p.stderr.close() @@ -128,27 +142,40 @@ def close(self): self.fileobj.close() def extract(self, member, path): - p = Popen(self._command + - ['x', self.fileobj.name, - (member.filename if isinstance(member, Member) else member), - path], stdout=PIPE) + p = Popen( + self._command + + [ + "x", + self.fileobj.name, + (member.filename if isinstance(member, Member) else member), + path, + ], + stdout=PIPE, + ) try: retcode = p.wait() if retcode: raise CalledProcessError( - retcode, self._command, output=p.stdout.read()) + retcode, self._command, output=p.stdout.read() + ) finally: p.stdout.close() - def extractall(self, path, members=[]): - p = Popen(self._command + - ['x', self.fileobj.name] + - [(m.filename if isinstance(m, Member) else m) for m in members] + - [path], stdout=PIPE) + def extractall(self, path, members=None): + if members is None: + members = [] + p = Popen( + self._command + + ["x", self.fileobj.name] + + [(m.filename if isinstance(m, Member) else m) for m in members] + + [path], + stdout=PIPE, + ) try: retcode = p.wait() if retcode: raise CalledProcessError( - retcode, self._command, output=p.stdout.read()) + retcode, self._command, output=p.stdout.read() + ) finally: p.stdout.close() diff --git a/destream/decompressors/tar.py b/destream/decompressors/tar.py index 62e1e3f..678d350 100644 --- a/destream/decompressors/tar.py +++ b/destream/decompressors/tar.py @@ -1,11 +1,10 @@ import tarfile as tarlib import io -import sys from os import SEEK_SET from destream import ArchivePack, make_seekable -__all__ = ['Untar'] +__all__ = ["Untar"] class FileMember(io.IOBase, tarlib.ExFileObject): @@ -50,17 +49,18 @@ def peek(self, n): return buf def readinto(self, b): - if len(b) == 0: return None + if len(b) == 0: + return None buf = self.read(len(b)) - b[:len(buf)] = buf + b[: len(buf)] = buf return len(buf) class Untar(ArchivePack): - _mimes = ['application/x-tar'] - _extensions = ['tar'] - __compression = 'tar' - _compression = 'tar' + _mimes = ["application/x-tar"] + _extensions = ["tar"] + __compression = "tar" + _compression = "tar" def __init__(self, name, fileobj): source = make_seekable(fileobj) @@ -68,11 +68,11 @@ def __init__(self, name, fileobj): first_member = self.tarfile.next() if first_member is None: raise OSError("can not read first member of the tar archive") - self._single = (self.tarfile.next() is None) + self._single = self.tarfile.next() is None if self._single: stream = tarlib.ExFileObject(self.tarfile, first_member) stream_name = first_member.name - self._compression += ':' + stream_name + self._compression += ":" + stream_name else: stream_name = name stream = source @@ -98,10 +98,8 @@ def extractall(self, path, members=None): return self.tarfile.extractall(path, members) def close(self): - return super().close() if self.single() \ - else self.tarfile.close() + return super().close() if self.single() else self.tarfile.close() @property def closed(self): - return super().closed if self.single() \ - else self.tarfile.closed + return super().closed if self.single() else self.tarfile.closed diff --git a/destream/decompressors/xz.py b/destream/decompressors/xz.py index 8cc7070..4469069 100644 --- a/destream/decompressors/xz.py +++ b/destream/decompressors/xz.py @@ -1,10 +1,10 @@ from destream import ExternalPipe -__all__ = ['Unxz'] +__all__ = ["Unxz"] class Unxz(ExternalPipe): - _mimes = ['application/x-xz'] - _extensions = ['xz'] - _command = 'unxz -c'.split() - _compression = 'xz' + _mimes = ["application/x-xz"] + _extensions = ["xz"] + _command = "unxz -c".split() + _compression = "xz" diff --git a/destream/decompressors/zip.py b/destream/decompressors/zip.py index bd29af4..c7807a2 100644 --- a/destream/decompressors/zip.py +++ b/destream/decompressors/zip.py @@ -1,16 +1,14 @@ import zipfile -import io -import sys -from destream import ArchivePack, make_seekable, ArchiveFile +from destream import ArchivePack, make_seekable -__all__ = ['Unzip'] +__all__ = ["Unzip"] class Unzip(ArchivePack): - _mimes = ['application/zip'] - _extensions = ['zip'] - _compression = 'zip' + _mimes = ["application/zip"] + _extensions = ["zip"] + _compression = "zip" def __init__(self, name, fileobj): # part of the Zip header is at the end of the file. Therefore, we have @@ -19,9 +17,13 @@ def __init__(self, name, fileobj): fileobj = make_seekable(fileobj) self.zipfile = zipfile.ZipFile(fileobj) if self.single(): - self._compression += ':' + self.members()[0].filename - ArchivePack.__init__(self, name, source=fileobj, - fileobj=(self.single() and self.open(self.members()[0]))) + self._compression += ":" + self.members()[0].filename + ArchivePack.__init__( + self, + name, + source=fileobj, + fileobj=(self.single() and self.open(self.members()[0])), + ) def members(self): return self.zipfile.infolist() diff --git a/destream/decompressors/zstd.py b/destream/decompressors/zstd.py index 78fa70c..e15ee07 100644 --- a/destream/decompressors/zstd.py +++ b/destream/decompressors/zstd.py @@ -1,10 +1,10 @@ from destream import ExternalPipe -__all__ = ['Unzstd'] +__all__ = ["Unzstd"] class Unzstd(ExternalPipe): - _mimes = ['application/zstd', 'application/x-zstd'] - _extensions = ['zst'] - _command = 'unzstd -c'.split() - _compression = 'zstd' + _mimes = ["application/zstd", "application/x-zstd"] + _extensions = ["zst"] + _command = "unzstd -c".split() + _compression = "zstd" diff --git a/destream/guesser.py b/destream/guesser.py index e9e966f..634e62e 100644 --- a/destream/guesser.py +++ b/destream/guesser.py @@ -11,20 +11,30 @@ class Guesser: """ Make a stream using the decompressors given in the constructor """ - def __init__(self, decompressors=builtin_decompressors, - extra_decompressors=[], limit=10): - self.decompressors = decompressors + extra_decompressors + + def __init__( + self, + decompressors=builtin_decompressors, + extra_decompressors=None, + limit=10, + ): + self.decompressors = decompressors + if extra_decompressors: + self.decompressors += extra_decompressors self.limit = limit def guess(self, archive): mime = magic.from_buffer(archive.peek(1024), mime=True) for _, decompressor in sorted(self.decompressors, key=lambda x: x[0]): - if isinstance(archive, ArchivePack) and \ - type(archive) is decompressor: + if ( + isinstance(archive, ArchivePack) + and type(archive) is decompressor + ): continue try: realname = decompressor._guess( - mime, str(archive.realname), archive) + mime, str(archive.realname), archive + ) decompressor._check_availability() return decompressor(realname, archive) except ValueError: diff --git a/destream/helpers.py b/destream/helpers.py index bb137be..32d2ee8 100644 --- a/destream/helpers.py +++ b/destream/helpers.py @@ -20,12 +20,13 @@ class ArchiveFile(Archive): """ Make an archive from a file-object """ + def __init__(self, fileobj=None, name=None, closefd=True): if not fileobj: if not name: raise TypeError("Either name, fileobj must be specified") fileobj = io.FileIO(name) - elif not name and hasattr(fileobj, 'name'): + elif not name and hasattr(fileobj, "name"): name = fileobj.name Archive.__init__(self, name, fileobj, source=fileobj, closefd=closefd) @@ -34,13 +35,14 @@ class ArchiveTemp(Archive): """ Write down a file-object to a temporary file and make an archive from it """ + def __init__(self, fileobj, name=None): if isinstance(fileobj, Archive): - if name is None: name = fileobj.realname + if name is None: + name = fileobj.realname else: name = fileobj.name - tempdir = \ - (os.path.dirname(name) if isinstance(name, str) else None) + tempdir = os.path.dirname(name) if isinstance(name, str) else None try: self.tempfile = tempfile.NamedTemporaryFile(dir=tempdir) except OSError: @@ -67,8 +69,7 @@ def make_seekable(fileobj): f"fileobj must be an instance of io.IOBase or a file, " f"got {type(fileobj)}" ) - return fileobj if fileobj.seekable() \ - else ArchiveTemp(fileobj) + return fileobj if fileobj.seekable() else ArchiveTemp(fileobj) class _ExternalPipeWriter(Thread): @@ -95,35 +96,40 @@ class ExternalPipe(Archive): """ Pipe a file-object to a command and make an archive of the output """ + def __init__(self, name, stdin): - assert type(self) is not ExternalPipe, \ - "This class can not be used in standalone" - assert hasattr(self, '_command'), ( - f"_command attribute is missing in class {type(self)}" - ) + assert ( + type(self) is not ExternalPipe + ), "This class can not be used in standalone" + assert hasattr( + self, "_command" + ), f"_command attribute is missing in class {type(self)}" self.p = Popen(self._command, stdout=PIPE, stdin=PIPE, stderr=PIPE) self.t = _ExternalPipeWriter(stdin, self.p.stdin) - super().__init__(name, fileobj=self.p.stdout, - source=stdin) + super().__init__(name, fileobj=self.p.stdout, source=stdin) self.t.start() @classmethod def _check_availability(cls): - assert cls is not ExternalPipe, \ - "This class can not be used in standalone" - assert hasattr(cls, '_command'), ( - f"_command attribute is missing in class {cls}" - ) + assert ( + cls is not ExternalPipe + ), "This class can not be used in standalone" + assert hasattr( + cls, "_command" + ), f"_command attribute is missing in class {cls}" commands = [cls._command[0]] - if hasattr(cls, '__fallbackcommands__'): + if hasattr(cls, "__fallbackcommands__"): commands += cls.__fallbackcommands__ existing_commands = [x for x in map(find_executable, commands) if x] if not existing_commands: if len(commands) == 1: raise OSError(2, commands[0], "cannot find executable") else: - raise OSError(2, commands[0], - "cannot find executable between: " + ", ".join(commands)) + raise OSError( + 2, + commands[0], + "cannot find executable between: " + ", ".join(commands), + ) cls._command[0] = existing_commands[0] @property diff --git a/scripts/destream b/scripts/destream index 961e682..4758407 100755 --- a/scripts/destream +++ b/scripts/destream @@ -27,16 +27,25 @@ def extract_stream(stdin, stdout=sys.stdout, output_dir=None): filename = "stdout" else: filename = basename(in_.realname) - with open(join_path(output_dir, filename), 'w') as out: + with open(join_path(output_dir, filename), "w") as out: copyfileobj(in_, out) + parser = argparse.ArgumentParser() -parser.add_argument("--force", "-f", action='store_true', - help="Decompress input file even if the input is a tty") -parser.add_argument("--output", "-o", - help="File or directory for output") -parser.add_argument("files", nargs='*', type=argparse.FileType('rb'), - help="List of files to extract.") +parser.add_argument( + "--force", + "-f", + action="store_true", + help="Decompress input file even if the input is a tty", +) +parser.add_argument("--output", "-o", help="File or directory for output") +parser.add_argument( + "files", + nargs="*", + type=argparse.FileType("rb"), + help="List of files to extract.", +) + def run(args): if not args.output: @@ -44,19 +53,21 @@ def run(args): elif isdir(args.output): stdout, output_dir = None, args.output else: - stdout, output_dir = open(args.output, 'w'), None + stdout, output_dir = open(args.output, "w"), None if args.files: for file_ in args.files: extract_stream(file_, stdout=stdout, output_dir=output_dir) else: if not args.force and isatty(sys.stdin.fileno()): - sys.stderr.write("%s: compressed data not read from a terminal. " - "Use -f to force decompression.\n" - % basename(sys.argv[0])) + sys.stderr.write( + "%s: compressed data not read from a terminal. " + "Use -f to force decompression.\n" % basename(sys.argv[0]) + ) sys.exit(1) extract_stream(sys.stdin, stdout=stdout, output_dir=output_dir) -if __name__ == '__main__': + +if __name__ == "__main__": args = parser.parse_args() try: run(args) diff --git a/setup.py b/setup.py index 40bf46a..e61cfb1 100644 --- a/setup.py +++ b/setup.py @@ -2,20 +2,21 @@ from setuptools import setup, find_packages setup( - name = "destream", - version = "5.0.1", - description = ("A simple module to decompress streams compressed multiple " - "times"), + name="destream", + version="5.0.1", + description=( + "A simple module to decompress streams compressed multiple times" + ), long_description=Path("README.md").read_text(), long_description_content_type="text/markdown", - license = "GPLv2", - keywords = "stream file decompress zip zstd", - url = "https://github.com/destream-py/destream", - packages = find_packages(), - scripts = ['scripts/destream'], - install_requires = ['python-magic>=0.4.12'], + license="GPLv2", + keywords="stream file decompress zip zstd", + url="https://github.com/destream-py/destream", + packages=find_packages(), + scripts=["scripts/destream"], + install_requires=["python-magic>=0.4.12"], extras_require={"test": ["tox", "pytest", "pytest-cov"]}, - classifiers = [ + classifiers=[ "Development Status :: 5 - Production/Stable", "Topic :: System :: Archiving :: Compression", "License :: OSI Approved :: GNU General Public License v2 (GPLv2)", diff --git a/tests/test_10_base.py b/tests/test_10_base.py index 3fe4518..f494a11 100644 --- a/tests/test_10_base.py +++ b/tests/test_10_base.py @@ -1,6 +1,7 @@ import os from tempfile import TemporaryFile from io import BytesIO + try: import unittest2 as unittest except ImportError: @@ -10,26 +11,30 @@ class BaseNameTest(Archive): - _extensions = ['ext1', 'ext2'] - _mimes = ['mime1', 'mime2'] + _extensions = ["ext1", "ext2"] + _mimes = ["mime1", "mime2"] class Archive(unittest.TestCase): def test_10_guess_basename(self): - fileobj = BytesIO(b'') + fileobj = BytesIO(b"") try: self.assertEqual( - 'xxx', BaseNameTest._guess('mime2', 'xxx', fileobj)) + "xxx", BaseNameTest._guess("mime2", "xxx", fileobj) + ) self.assertEqual( - 'xxx', BaseNameTest._guess('mime1', 'xxx.ext2', fileobj)) + "xxx", BaseNameTest._guess("mime1", "xxx.ext2", fileobj) + ) self.assertEqual( - 'xxx', BaseNameTest._guess('mime2', 'xxx.ext1', fileobj)) + "xxx", BaseNameTest._guess("mime2", "xxx.ext1", fileobj) + ) except ValueError as e: self.fail(repr(e)) try: self.assertEqual( - 'xxx', BaseNameTest._guess('xxx', 'xxx.ext1', fileobj)) - except ValueError as e: + "xxx", BaseNameTest._guess("xxx", "xxx.ext1", fileobj) + ) + except ValueError: pass else: self.fail("guessing should has failed") @@ -38,14 +43,13 @@ def test_10_guess_basename(self): class ArchiveFileTest(unittest.TestCase): def _regular_tests(self, archive, fileobj, filename, text): self.assertEqual( - archive.fileno(), - fileobj.fileno(), - "file no does not match!") - self.assertEqual(archive.name, filename, - "name attribute does not match!") + archive.fileno(), fileobj.fileno(), "file no does not match!" + ) + self.assertEqual( + archive.name, filename, "name attribute does not match!" + ) archive.seek(0) - self.assertEqual(archive.read(), text, - "file content does not match!") + self.assertEqual(archive.read(), text, "file content does not match!") def test_10_passing_file_object(self): text = b"Hello World!\n" @@ -74,30 +78,31 @@ def test_30_closefd(self): class CatsEye(ExternalPipe): - _command = ['cat'] - _compression = 'cat' + _command = ["cat"] + _compression = "cat" _unique_instance = True class ExternalPipeTest(unittest.TestCase): def _regular_tests(self, pipe, filename, text): - self.assertEqual(pipe.realname, filename, - "name attribute does not match!") + self.assertEqual( + pipe.realname, filename, "name attribute does not match!" + ) self.assertEqual(pipe.read(), text, "file content does not match!") - self.assertEqual(pipe.read(), b'', "should be the end of file") + self.assertEqual(pipe.read(), b"", "should be the end of file") def test_10_check_output(self): text = b"Hello World\n" - filename = '' + filename = "" fileobj = BytesIO(text) with CatsEye(filename, fileobj) as pipe: try: - CatsEye._guess('', filename, pipe) + CatsEye._guess("", filename, pipe) except ValueError: pass else: self.fail("CatsEye is _unique_instance = True") - self.assertEqual(pipe.compressions, ['cat']) + self.assertEqual(pipe.compressions, ["cat"]) self.assertEqual(pipe._decompressors, [CatsEye]) self._regular_tests(pipe, filename, text) @@ -109,9 +114,10 @@ def test_10_create_temp_archive_from_externalpipe(self): fileobj = BytesIO(text) with CatsEye(filename, fileobj) as pipe: temp = ArchiveTemp(pipe) - self.assertEqual(pipe.read(), b'', "should be the end of file") + self.assertEqual(pipe.read(), b"", "should be the end of file") self.assertEqual( os.path.dirname(os.path.abspath(filename)), os.path.dirname(os.path.abspath(temp.name)), - "Temp file and temp archive should be in the same directory") + "Temp file and temp archive should be in the same directory", + ) self.assertEqual(temp.read(), text) diff --git a/tests/test_30_decompressors.py b/tests/test_30_decompressors.py index b7893ae..339705f 100644 --- a/tests/test_30_decompressors.py +++ b/tests/test_30_decompressors.py @@ -2,11 +2,11 @@ import os import tempfile import shutil -import sys from io import BytesIO import tarfile import zipfile import magic + try: import unittest2 as unittest except ImportError: @@ -16,8 +16,13 @@ class GuesserTest(unittest.TestCase): - def _check_decompressor(self, decompressor, compressed_fileobj, - decompressed_fileobj, expected_name=None): + def _check_decompressor( + self, + decompressor, + compressed_fileobj, + decompressed_fileobj, + expected_name=None, + ): try: decompressor._check_availability() except AttributeError: @@ -31,56 +36,78 @@ def _check_decompressor(self, decompressor, compressed_fileobj, decompressor._guess(mime, str(archive.realname), compressed_fileobj) compressed_fileobj.seek(0) with destream.open( - fileobj=compressed_fileobj, closefd=False) as archive: + fileobj=compressed_fileobj, closefd=False + ) as archive: # check that the decompressor has been used self.assertIn( decompressor, - archive._decompressors, "the decompressor didn't apply") + archive._decompressors, + "the decompressor didn't apply", + ) self.assertIn( decompressor._compression, - (x.split(':')[0] for x in archive.compressions), - "archive's compressions is bad") + (x.split(":")[0] for x in archive.compressions), + "archive's compressions is bad", + ) # check that the cursor is at the beginning of the file # (not available for streams) if archive.seekable(): - self.assertEqual(archive.tell(), 0, - "the archive cursor should be on position 0") + self.assertEqual( + archive.tell(), + 0, + "the archive cursor should be on position 0", + ) # check that the realname with extension match the source realname - if not isinstance(archive, destream.ArchivePack) \ - or archive.single(): - self.assertEqual(archive.read(), decompressed_fileobj.read(), - "content does not match") + if ( + not isinstance(archive, destream.ArchivePack) + or archive.single() + ): + self.assertEqual( + archive.read(), + decompressed_fileobj.read(), + "content does not match", + ) # check that the realname of archive is the same than the # single file member if isinstance(archive, destream.ArchivePack): - filename = getattr(archive.members()[0], 'filename', - getattr(archive.members()[0], 'name', None)) + filename = getattr( + archive.members()[0], + "filename", + getattr(archive.members()[0], "name", None), + ) self.assertEqual( - archive.realname, os.path.basename(filename), + archive.realname, + os.path.basename(filename), "the archive should have a realname set on the " - "single member's filename") + "single member's filename", + ) if expected_name is not None: self.assertEqual( - archive.realname, expected_name, + archive.realname, + expected_name, "the file inside the archive does not have " - "the right name") + "the right name", + ) else: # check that archive realname with extension match its source # realname self.assertEqual( - archive.realname + '.' - + decompressor._extensions[0], + archive.realname + "." + decompressor._extensions[0], archive.source.realname, - "expected archive name does not match") + "expected archive name does not match", + ) # test source archive archive.seek(0) - archive_content = archive.read() + archive.read() archive.source.seek(0) - source_content = archive.source.read() - self.assertEqual(archive.read(), archive.source.read(), + archive.source.read() + self.assertEqual( + archive.read(), + archive.source.read(), "content should have the same content than source archive " - "for archives having multiple files") + "for archives having multiple files", + ) # test open() # TODO: depending on the decompressor, open() should be tested # with different arguments (like stream=False) @@ -89,25 +116,29 @@ def _check_decompressor(self, decompressor, compressed_fileobj, self.assertEqual( fileobj.read(), decompressed_fileobj.read(), - "content does not match") + "content does not match", + ) # test extract() tempdir = tempfile.mkdtemp() try: for member in archive.members(): - if hasattr(member, 'isfile') and not member.isfile(): + if hasattr(member, "isfile") and not member.isfile(): continue archive.extract(member, tempdir) - filename = getattr(member, 'filename', - getattr(member, 'name', None)) + filename = getattr( + member, "filename", getattr(member, "name", None) + ) if filename is None: raise AttributeError( f"{type(member).__name__} instance has no " f"attribute 'filename' nor 'name'" ) filepath = os.path.join(tempdir, filename) - self.assertTrue(os.path.isfile(filepath), + self.assertTrue( + os.path.isfile(filepath), "can not extract using extract() method: " - + filepath) + + filepath, + ) finally: shutil.rmtree(tempdir) # test extractall() @@ -115,21 +146,24 @@ def _check_decompressor(self, decompressor, compressed_fileobj, try: archive.extractall(tempdir) for member in archive.members(): - filename = getattr(member, 'filename', - getattr(member, 'name', None)) + filename = getattr( + member, "filename", getattr(member, "name", None) + ) if filename is None: raise AttributeError( f"{type(member).__name__} instance has no " f"attribute 'filename' nor 'name'" ) filepath = os.path.join(tempdir, filename) - self.assertTrue(os.path.exists(filepath), + self.assertTrue( + os.path.exists(filepath), "can not extract using extract() method: " - + filepath) + + filepath, + ) finally: shutil.rmtree(tempdir) # force closing archive by deleting the instance - archive = None + del archive self.assertFalse(compressed_fileobj.closed) self.assertFalse(decompressed_fileobj.closed) @@ -137,55 +171,59 @@ def test_10_plain_text(self): fileobj = BytesIO(b"Hello World\n") fileobj.name = "test_file.txt" guessed = destream.open(fileobj=fileobj) - self.assertEqual(guessed.compressions, [], - "should not have compressions") + self.assertEqual( + guessed.compressions, [], "should not have compressions" + ) fileobj.seek(0) - self.assertEqual(fileobj.read(), guessed.read(), - "should have the same content") + self.assertEqual( + fileobj.read(), guessed.read(), "should have the same content" + ) self.assertEqual(guessed.realname, fileobj.name) def test_20_external_pipe_lzma(self): uncompressed = BytesIO(b"Hello World\n") - uncompressed.name = 'test_file' + uncompressed.name = "test_file" raw = BytesIO( - b']\x00\x00\x80\x00\xff\xff\xff\xff\xff\xff\xff\xff\x00' - b'$\x19I\x98o\x10\x11\xc8_\xe6\xd5\x8a\x04\xda\x01\xc7' - b'\xff\xff\x0b8\x00\x00') + b"]\x00\x00\x80\x00\xff\xff\xff\xff\xff\xff\xff\xff\x00" + b"$\x19I\x98o\x10\x11\xc8_\xe6\xd5\x8a\x04\xda\x01\xc7" + b"\xff\xff\x0b8\x00\x00" + ) raw.name = "test_file.lzma" self._check_decompressor( - destream.decompressors.Unlzma, - raw, uncompressed) + destream.decompressors.Unlzma, raw, uncompressed + ) def test_20_external_pipe_gzip(self): uncompressed = BytesIO(b"Hello World\n") - uncompressed.name = 'test_file' + uncompressed.name = "test_file" raw = BytesIO( - b'\x1f\x8b\x08\x00\x96\xfa\rS\x00\x03\xf3H\xcd\xc9\xc9W\x08\xcf' - b'/\xcaI\xe1\x02\x00\xe3\xe5\x95\xb0\x0c\x00\x00\x00') + b"\x1f\x8b\x08\x00\x96\xfa\rS\x00\x03\xf3H\xcd\xc9\xc9W\x08\xcf" + b"/\xcaI\xe1\x02\x00\xe3\xe5\x95\xb0\x0c\x00\x00\x00" + ) for ext, expected_name in [ - ('.gz', uncompressed.name), - ('.GZ', uncompressed.name), - ('-gz', uncompressed.name), - ('.z', uncompressed.name), - ('-z', uncompressed.name), - ('_z', uncompressed.name), - ('.tgz', uncompressed.name + '.tar'), - ('.taz', uncompressed.name + '.tar'), - ('.TAZ', uncompressed.name + '.tar'), - ]: + (".gz", uncompressed.name), + (".GZ", uncompressed.name), + ("-gz", uncompressed.name), + (".z", uncompressed.name), + ("-z", uncompressed.name), + ("_z", uncompressed.name), + (".tgz", uncompressed.name + ".tar"), + (".taz", uncompressed.name + ".tar"), + (".TAZ", uncompressed.name + ".tar"), + ]: uncompressed.seek(0) raw.seek(0) raw.name = "test_file" + ext self._check_decompressor( - destream.decompressors.Gunzip, - raw, uncompressed, expected_name) + destream.decompressors.Gunzip, raw, uncompressed, expected_name + ) def test_30_tar_single_file(self): uncompressed = BytesIO(b"Hello World\n") - uncompressed.name = 'test_file' + uncompressed.name = "test_file" raw = BytesIO() raw.name = "test_file.tar" - tar = tarfile.open(fileobj=raw, mode='w') + tar = tarfile.open(fileobj=raw, mode="w") try: tarinfo = tarfile.TarInfo(uncompressed.name) tarinfo.size = len(uncompressed.getvalue()) @@ -195,17 +233,17 @@ def test_30_tar_single_file(self): tar.close() raw.seek(0) self._check_decompressor( - destream.decompressors.Untar, - raw, uncompressed) + destream.decompressors.Untar, raw, uncompressed + ) def test_40_tar_multiple_files(self): uncompressed = BytesIO(b"Hello World\n") uncompressed.name = None raw = BytesIO() raw.name = "test_file.tar" - tar = tarfile.open(fileobj=raw, mode='w') + tar = tarfile.open(fileobj=raw, mode="w") try: - for filename in ('a/test_file1', 'b/test_file2'): + for filename in ("a/test_file1", "b/test_file2"): tarinfo = tarfile.TarInfo(filename) tarinfo.size = len(uncompressed.getvalue()) uncompressed.seek(0) @@ -214,143 +252,153 @@ def test_40_tar_multiple_files(self): tar.close() raw.seek(0) self._check_decompressor( - destream.decompressors.Untar, - raw, uncompressed) + destream.decompressors.Untar, raw, uncompressed + ) def test_20_external_pipe_xz(self): uncompressed = BytesIO(b"Hello World\n") - uncompressed.name = 'test_file' + uncompressed.name = "test_file" raw = BytesIO( - b'\xfd7zXZ\x00\x00\x04\xe6\xd6\xb4F\x02\x00!\x01\x16\x00\x00\x00' + b"\xfd7zXZ\x00\x00\x04\xe6\xd6\xb4F\x02\x00!\x01\x16\x00\x00\x00" b't/\xe5\xa3\x01\x00\x0bHello World\n\x00"\xe0u?\xd5\xed8>\x00\x01' - b'$\x0c\xa6\x18\xd8\xd8\x1f\xb6\xf3}\x01\x00\x00\x00\x00\x04YZ') + b"$\x0c\xa6\x18\xd8\xd8\x1f\xb6\xf3}\x01\x00\x00\x00\x00\x04YZ" + ) raw.name = "test_file.xz" self._check_decompressor( - destream.decompressors.Unxz, - raw, uncompressed) + destream.decompressors.Unxz, raw, uncompressed + ) def test_20_external_pipe_zstd(self): uncompressed = BytesIO(b"Hello World\n") - uncompressed.name = 'test_file' - raw = BytesIO( - b'(\xb5/\xfd$\x0ca\x00\x00Hello World\n\x93C\x0f\x1a') + uncompressed.name = "test_file" + raw = BytesIO(b"(\xb5/\xfd$\x0ca\x00\x00Hello World\n\x93C\x0f\x1a") raw.name = "test_file.zst" self._check_decompressor( - destream.decompressors.Unzstd, - raw, uncompressed) + destream.decompressors.Unzstd, raw, uncompressed + ) def test_30_7z_single_file(self): uncompressed = BytesIO(b"Hello World\n") uncompressed.name = None # no file, only the content is packed, use 7zr -si to make it - raw = BytesIO(b"7z\xbc\xaf'\x1c\x00\x03\\\x01\xca\xbe\x11\x00\x00\x00" + raw = BytesIO( + b"7z\xbc\xaf'\x1c\x00\x03\\\x01\xca\xbe\x11\x00\x00\x00" b"\x00\x00\x00\x00;\x00\x00\x00\x00\x00\x00\x00\xccl\x1bR\x00" b"$\x19I\x98o\x10\x11\xc8_\xe6\xd5\x8a\x02\x8f\x14\x00\x01\x04" b"\x06\x00\x01\t\x11\x00\x07\x0b\x01\x00\x01#\x03\x01\x01\x05]" b"\x00\x00\x00\x01\x0c\x0c\x00\x08\n\x01\xe3\xe5\x95\xb0\x00" b"\x00\x05\x01\x14\n\x01\x00\xc0\x8dZ!\xf62\xcf\x01\x15\x06" - b"\x01\x00\x00\x00\x00\x00\x00\x00") + b"\x01\x00\x00\x00\x00\x00\x00\x00" + ) self._check_decompressor( - destream.decompressors.Un7z, - raw, uncompressed) + destream.decompressors.Un7z, raw, uncompressed + ) uncompressed = BytesIO(b"Hello World\n") - uncompressed.name = 'a' + uncompressed.name = "a" # only one file, named, but same content - raw = BytesIO(b"7z\xbc\xaf'\x1c\x00\x03+v\xeet\x11\x00\x00\x00\x00\x00" + raw = BytesIO( + b"7z\xbc\xaf'\x1c\x00\x03+v\xeet\x11\x00\x00\x00\x00\x00" b"\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x10\xb9\x06\x02\x00$" b"\x19I\x98o\x10\x11\xc8_\xe6\xd5\x8a\x02\x8f\x14\x00\x01\x04" b"\x06\x00\x01\t\x11\x00\x07\x0b\x01\x00\x01#\x03\x01\x01\x05" b"]\x00\x00\x01\x00\x0c\x0c\x00\x08\n\x01\xe3\xe5\x95\xb0\x00" b"\x00\x05\x01\x11\x05\x00a\x00\x00\x00\x14\n\x01\x00\x80]]\\" - b"\xf62\xcf\x01\x15\x06\x01\x00 \x80\xa4\x81\x00\x00") + b"\xf62\xcf\x01\x15\x06\x01\x00 \x80\xa4\x81\x00\x00" + ) raw.name = "test_file.7z" self._check_decompressor( - destream.decompressors.Un7z, - raw, uncompressed) + destream.decompressors.Un7z, raw, uncompressed + ) def test_40_7z_multiple_files(self): uncompressed = BytesIO(b"Hello World\n") uncompressed.name = None raw = BytesIO( - b'7z\xbc\xaf\'\x1c\x00\x03\x10\xads\x82x\x00\x00\x00\x00\x00\x00' - b'\x00!\x00\x00\x00\x00\x00\x00\x00\x7f$\xaa\x86\x00$\x19I\x98o' - b'\x10\x11\xc8_\xe6\xd5\x8a\x05U3\x9d`\x00\x00\x00\x813\x07\xae' - b'\x0f\xcf\'\xf0\x8c\x07\xc8C\x80\x83\x81[\xff\xac\x80\x1dP\x19' - b'\xff\xf6\xf8\x17!l\xa9\xf9r\x19\x1b^y\xee#r\xd7\x15\xd2\xfc\xe1' + b"7z\xbc\xaf'\x1c\x00\x03\x10\xads\x82x\x00\x00\x00\x00\x00\x00" + b"\x00!\x00\x00\x00\x00\x00\x00\x00\x7f$\xaa\x86\x00$\x19I\x98o" + b"\x10\x11\xc8_\xe6\xd5\x8a\x05U3\x9d`\x00\x00\x00\x813\x07\xae" + b"\x0f\xcf'\xf0\x8c\x07\xc8C\x80\x83\x81[\xff\xac\x80\x1dP\x19" + b"\xff\xf6\xf8\x17!l\xa9\xf9r\x19\x1b^y\xee#r\xd7\x15\xd2\xfc\xe1" b'\x17\xfa\xaa"\xafV\x05\xd7>\x1c\xf5\x93\xb5!R\x11\xdcMP\xf6\xab' - b'\xc7\xd5\xc9\xbdj*{\xffp\x81\xbd\xf9\xbd\xf3\x87W\xfe\xa3F\xa3~&' - b'(\xdc{\xd4\xb6Z\x9d\x98Dj \x00\x00\x17\x06\x13\x01\te\x00\x07' - b'\x0b\x01\x00\x01#\x03\x01\x01\x05]\x00\x10\x00\x00\x0c\x80\x85' - b'\n\x01pF\xbb5\x00\x00') + b"\xc7\xd5\xc9\xbdj*{\xffp\x81\xbd\xf9\xbd\xf3\x87W\xfe\xa3F\xa3~&" + b"(\xdc{\xd4\xb6Z\x9d\x98Dj \x00\x00\x17\x06\x13\x01\te\x00\x07" + b"\x0b\x01\x00\x01#\x03\x01\x01\x05]\x00\x10\x00\x00\x0c\x80\x85" + b"\n\x01pF\xbb5\x00\x00" + ) raw.name = "test_file.7z" self._check_decompressor( - destream.decompressors.Un7z, - raw, uncompressed) + destream.decompressors.Un7z, raw, uncompressed + ) def test_30_zip_single_file(self): uncompressed = BytesIO(b"Hello World\n") - uncompressed.name = 'test_file' + uncompressed.name = "test_file" raw = BytesIO() raw.name = "test_file.zip" - zip = zipfile.ZipFile(raw, 'w') + zip = zipfile.ZipFile(raw, "w") try: zip.writestr("test_file", uncompressed.getvalue()) finally: zip.close() raw.seek(0) self._check_decompressor( - destream.decompressors.Unzip, - raw, uncompressed) + destream.decompressors.Unzip, raw, uncompressed + ) def test_40_zip_multiple_files(self): uncompressed = BytesIO(b"Hello World\n") uncompressed.name = None raw = BytesIO() raw.name = "test_file.zip" - zip = zipfile.ZipFile(raw, 'w') + zip = zipfile.ZipFile(raw, "w") try: - for filename in ('a/test_file1', 'b/test_file2'): + for filename in ("a/test_file1", "b/test_file2"): zip.writestr(filename, uncompressed.getvalue()) finally: zip.close() raw.seek(0) self._check_decompressor( - destream.decompressors.Unzip, - raw, uncompressed) + destream.decompressors.Unzip, raw, uncompressed + ) def test_20_external_pipe_bzip2(self): uncompressed = BytesIO(b"Hello World\n") uncompressed.name = "test_file" raw = BytesIO( - b'BZh91AY&SY\xd8r\x01/\x00\x00\x01W\x80\x00\x10@\x00\x00@\x00' + b"BZh91AY&SY\xd8r\x01/\x00\x00\x01W\x80\x00\x10@\x00\x00@\x00" b'\x80\x06\x04\x90\x00 \x00"\x06\x86\xd4 \xc9\x88\xc7i\xe8(\x1f' - b'\x8b\xb9"\x9c(Hl9\x00\x97\x80') + b'\x8b\xb9"\x9c(Hl9\x00\x97\x80' + ) for ext, expected_name in [ - ('.bz2', uncompressed.name), - ('.bz', uncompressed.name), - ('.tbz', uncompressed.name + '.tar'), - ('.tbz2', uncompressed.name + '.tar'), - ]: + (".bz2", uncompressed.name), + (".bz", uncompressed.name), + (".tbz", uncompressed.name + ".tar"), + (".tbz2", uncompressed.name + ".tar"), + ]: uncompressed.seek(0) raw.seek(0) raw.name = "test_file" + ext self._check_decompressor( destream.decompressors.Bunzip2, - raw, uncompressed, expected_name) + raw, + uncompressed, + expected_name, + ) def test_30_rar_single_file(self): uncompressed = BytesIO(b"Hello World\n") - uncompressed.name = 'a' + uncompressed.name = "a" raw = BytesIO( b"Rar!\x1a\x07\x00\xcf\x90s\x00\x00\r\x00\x00\x00\x00\x00\x00\x00" b"\x98\xdct \x90#\x00\x19\x00\x00\x00\x0c\x00\x00\x00\x03\xe3\xe5" b"\x95\xb0\x05|[D\x1d3\x01\x00\xa4\x81\x00\x00a\x00\xc0\x0c\x0c" b"\xcb\xec\xcb\xf1\x14'\x04\x18\x81\x0e\xec\x9aL\xff\xe3?\xfe\xcf" - b"\x05z\x99\xd5\x10\xc4={\x00@\x07\x00") + b"\x05z\x99\xd5\x10\xc4={\x00@\x07\x00" + ) raw.name = "test_file.rar" self._check_decompressor( - destream.decompressors.Unrar, - raw, uncompressed) + destream.decompressors.Unrar, raw, uncompressed + ) def test_40_rar_multiple_files(self): uncompressed = BytesIO(b"Hello World\n") @@ -366,22 +414,23 @@ def test_40_rar_multiple_files(self): b"\x05z\x99\xd5\x10\x98~t\xe0\x80!\x00\x00\x00\x00\x00\x00\x00\x00" b"\x00\x03\x00\x00\x00\x007T\\D\x140\x01\x00\xedA\x00\x00a[\x99t" b"\xe0\x90#\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00" - b"\x00=T\\D\x140\x01\x00\xedA\x00\x00c\x00\xc0\xc4={\x00@\x07\x00") + b"\x00=T\\D\x140\x01\x00\xedA\x00\x00c\x00\xc0\xc4={\x00@\x07\x00" + ) raw.name = "test_file.rar" self._check_decompressor( - destream.decompressors.Unrar, - raw, uncompressed) + destream.decompressors.Unrar, raw, uncompressed + ) def test_50_object_closed_on_delete(self): - with tempfile.NamedTemporaryFile('w+b') as fh: + with tempfile.NamedTemporaryFile("w+b") as fh: # NOTE: the file must be big enough - with gzip.open(fh.name, 'w+b') as gzipped: + with gzip.open(fh.name, "w+b") as gzipped: for i in range(3000): gzipped.write(os.urandom(1024)) archive = destream.open(fh.name) self.assertIn( - destream.decompressors.Gunzip, - archive._decompressors) + destream.decompressors.Gunzip, archive._decompressors + ) proc = archive.p thread = archive.t del archive @@ -390,6 +439,6 @@ def test_50_object_closed_on_delete(self): archive2 = destream.open(fh.name) proc2 = archive2.p thread2 = archive2.t - archive2 = None + del archive2 self.assertIsNotNone(proc2.poll()) self.assertFalse(thread2.is_alive())