Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply black -l 79 and do some code cleanup #22

Merged
merged 5 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Lib Usage
```python
archive = destream.open("some_file.tar.gz")

assert isinstance(archive, destream.ArchivePack) \
and isinstance(archive.tarfile, tarfile.TarFile)
assert (isinstance(archive, destream.ArchivePack) and
isinstance(archive.tarfile, tarfile.TarFile))

# ==> we can extract members using extract() and extractall()
archive.extractall("/tmp")
Expand Down
43 changes: 26 additions & 17 deletions destream/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@
""".split()


RE_EXTENSION = re.compile(r'^(.*?)(\.([^.]+))?$')
RE_EXTENSION = re.compile(r"^(.*?)(\.([^.]+))?$")


class Archive(io.BufferedReader):
"""
Base class to Archive file
"""

def __init__(self, name, fileobj=None, source=None, closefd=True):
assert type(self) != Archive, \
"This class can not be used in standalone"
assert (
type(self) != Archive
), "This class can not be used in standalone"
if not fileobj:
fileobj = io.BytesIO()
elif sys.version_info < (3, 0) and isinstance(fileobj, file):
Expand All @@ -32,7 +34,7 @@ def __init__(self, name, fileobj=None, source=None, closefd=True):
f"got {type(fileobj)}"
)
io.BufferedReader.__init__(self, fileobj)
self.realname = name or ''
self.realname = name or ""
self.source = source
self.closefd = closefd
if isinstance(source, Archive):
Expand All @@ -41,43 +43,46 @@ def __init__(self, name, fileobj=None, source=None, closefd=True):
else:
self._decompressors = [type(self)]
self.compressions = []
if hasattr(self, '_compression'):
if hasattr(self, "_compression"):
self.compressions += [self._compression]

@classmethod
def _check_availability(self):
def _check_availability(cls):
pass

@classmethod
def _guess(cls, mime, name, fileobj):
eumiro marked this conversation as resolved.
Show resolved Hide resolved
if getattr(cls, '_unique_instance', False):
if getattr(cls, "_unique_instance", False):
if cls in fileobj._decompressors:
raise ValueError(
f"class {cls} already in the decompressor list"
)
realname = name
if hasattr(cls, '_mimes'):
if hasattr(cls, "_mimes"):
match = RE_EXTENSION.search(name)
if hasattr(cls, '_extensions') and \
match.group(2) and \
os.path.normcase(match.group(3)) in cls._extensions:
if (
hasattr(cls, "_extensions")
and match.group(2)
and os.path.normcase(match.group(3)) in cls._extensions
):
realname = match.group(1)
if mime not in cls._mimes:
raise ValueError(
(cls, mime, name, fileobj),
f"can not decompress fileobj using class {cls.__name__}"
f"can not decompress fileobj using class {cls.__name__}",
)
return realname

def close(self):
if getattr(self, 'closefd', True):
if getattr(self, "closefd", True):
super().close()


class ArchivePack(Archive):
"""
Base class for an archive that is also a pack of file (tar, zip, ...)
"""

def __init__(self, name, fileobj=None, source=None):
Archive.__init__(self, name, fileobj, source=source)

Expand All @@ -86,16 +91,20 @@ def single(self):

def members(self):
raise NotImplementedError(
f"class {type(self)} does not implement this method")
f"class {type(self)} does not implement this method"
)
eumiro marked this conversation as resolved.
Show resolved Hide resolved

def open(self, member):
raise NotImplementedError(
f"class {type(self)} does not implement this method")
f"class {type(self)} does not implement this method"
)

def extract(self, member, path):
raise NotImplementedError(
f"class {type(self)} does not implement this method")
f"class {type(self)} does not implement this method"
)

def extractall(self, path, members=None):
raise NotImplementedError(
f"class {type(self)} does not implement this method")
f"class {type(self)} does not implement this method"
)
5 changes: 3 additions & 2 deletions destream/decompressors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

builtin_decompressors = [
(0, symbol)
for name, symbol
in inspect.getmembers(sys.modules[__name__], inspect.isclass)
for name, symbol in inspect.getmembers(
sys.modules[__name__], inspect.isclass
)
if issubclass(symbol, archive.Archive)
]
14 changes: 7 additions & 7 deletions destream/decompressors/bzip2.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from destream import ExternalPipe

__all__ = ['Bunzip2']
__all__ = ["Bunzip2"]


class Bunzip2(ExternalPipe):
_mimes = ['application/x-bzip2']
_extensions = ['bz2', 'bz', 'tbz2', 'tbz']
_command = ['bunzip2']
_compression = 'bzip2'
_mimes = ["application/x-bzip2"]
_extensions = ["bz2", "bz", "tbz2", "tbz"]
_command = ["bunzip2"]
_compression = "bzip2"

@classmethod
def _guess(cls, mime, name, fileobj):
is_tar = name.endswith('.tbz2') or name.endswith('.tbz')
is_tar = name.endswith(".tbz2") or name.endswith(".tbz")
realname = super()._guess(mime, name, fileobj)
return realname + '.tar' if is_tar else realname
return realname + ".tar" if is_tar else realname
26 changes: 13 additions & 13 deletions destream/decompressors/gzip.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
from destream import ExternalPipe

__all__ = ['Gunzip']
__all__ = ["Gunzip"]


class Gunzip(ExternalPipe):
_mimes = [
'application/x-gzip',
'application/gzip',
"application/x-gzip",
"application/gzip",
]
_extensions = ['gz']
_command = ['gunzip']
_compression = 'gzip'
_extensions = ["gz"]
_command = ["gunzip"]
_compression = "gzip"

@classmethod
def _guess(cls, mime, name, fileobj):
if mime not in cls._mimes:
raise ValueError("not a gzip compression")
lowered = name.lower()
if lowered.endswith('.gz'):
if lowered.endswith(".gz"):
realname = name[:-3]
elif lowered.endswith('-gz'):
elif lowered.endswith("-gz"):
realname = name[:-3]
elif lowered.endswith('.z'):
elif lowered.endswith(".z"):
realname = name[:-2]
elif lowered.endswith('-z'):
elif lowered.endswith("-z"):
realname = name[:-2]
elif lowered.endswith('_z'):
elif lowered.endswith("_z"):
realname = name[:-2]
elif lowered.endswith('.tgz') or lowered.endswith('.taz'):
realname = name[:-4] + '.tar'
elif lowered.endswith(".tgz") or lowered.endswith(".taz"):
realname = name[:-4] + ".tar"
else:
realname = name
return realname
10 changes: 5 additions & 5 deletions destream/decompressors/lzma.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from destream import ExternalPipe

__all__ = ['Unlzma']
__all__ = ["Unlzma"]


class Unlzma(ExternalPipe):
_mimes = ['application/x-lzma']
_extensions = ['lzma']
_command = 'unlzma -c'.split()
_compression = 'lzma'
_mimes = ["application/x-lzma"]
_extensions = ["lzma"]
_command = "unlzma -c".split()
_compression = "lzma"
Loading