From 136ec7de01776126ea813afcb76e9c25914cf05a Mon Sep 17 00:00:00 2001 From: Pavel Zubarev <40171187+pavelzubarev@users.noreply.github.com> Date: Mon, 30 Oct 2023 18:48:10 +0100 Subject: [PATCH] Fix multipart cache (#104) --- CHANGELOG.md | 5 ++ cognite/cdffs/__init__.py | 2 +- cognite/cdffs/az_upload_strategy.py | 12 ++++- cognite/cdffs/file_handler.py | 10 +++- cognite/cdffs/gcp_upload_strategy.py | 12 +++-- cognite/cdffs/spec.py | 76 ++++++++++++++++++++++------ poetry.lock | 64 ++++++----------------- pyproject.toml | 4 +- 8 files changed, 110 insertions(+), 75 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b176eeb..42102a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -141,3 +141,8 @@ Changes are grouped as follows ### Added - Support for native multipart file upload for CDF in Azure and Google. + +## [0.2.10] - 2023-11-1 + +### Fixed +- Fix internal cache accumulated if big files are handled with native multipart implementation diff --git a/cognite/cdffs/__init__.py b/cognite/cdffs/__init__.py index e7d4329..1f2cd0b 100644 --- a/cognite/cdffs/__init__.py +++ b/cognite/cdffs/__init__.py @@ -3,7 +3,7 @@ from .spec import CdfFileSystem -__version__ = "0.2.9" +__version__ = "0.2.10" __all__ = ["CdfFileSystem"] fsspec.register_implementation(CdfFileSystem.protocol, CdfFileSystem) diff --git a/cognite/cdffs/az_upload_strategy.py b/cognite/cdffs/az_upload_strategy.py index f1509b8..2133341 100644 --- a/cognite/cdffs/az_upload_strategy.py +++ b/cognite/cdffs/az_upload_strategy.py @@ -15,6 +15,7 @@ def __init__(self, metadata: FileMetadata, cognite_client: CogniteClient): """Initializer.""" super().__init__(metadata, cognite_client) self.total_size = 0 + self.logger = logging.getLogger("cdffs.AzureUploadStrategy") def _generate_block_blob_block_id(self, index: int, block_name_prefix: str = __file__.__str__()) -> str: block_id = f"{block_name_prefix.ljust(19, 'x')[:19]}{index:05}".encode("utf-8") @@ -37,10 +38,14 @@ def upload_chunk(self, data: bytes, index: int) -> None: }, ) response.raise_for_status() - self.total_size += len(data) # track total object size with self.lock: + self.total_size += len(data) # track total object size self.indexes.append(index) - logging.info(f"Finished uploading block {index}. Took {response.elapsed.total_seconds()} sec") + + self.logger.debug( + f"Finished uploading block {index}. Current file size: {self.total_size}. " + f"Took {response.elapsed.total_seconds()} sec" + ) def merge_chunks(self) -> int: """Merge all uploaded blocks into the final blob.""" @@ -62,5 +67,8 @@ def merge_chunks(self) -> int: }, ) response.raise_for_status() + self.logger.debug( + f"Merged blocks. Total file size: {self.total_size}. " f"Took {response.elapsed.total_seconds()} sec" + ) return self.total_size diff --git a/cognite/cdffs/file_handler.py b/cognite/cdffs/file_handler.py index 151179a..812d0fc 100644 --- a/cognite/cdffs/file_handler.py +++ b/cognite/cdffs/file_handler.py @@ -32,7 +32,10 @@ def __init__(self) -> None: self.session = requests.Session() def download_file( - self, download_url: str, start_byte: Union[int, None] = None, end_byte: Union[int, None] = None + self, + download_url: str, + start_byte: Union[int, None] = None, + end_byte: Union[int, None] = None, ) -> bytes: """Download the file from a cloud storage using the download URL & offsets provided. @@ -65,7 +68,10 @@ def add_or_update_url(self, external_id: str, download_url: str) -> None: external_id (str): External Id for the file. download_url (str): Download URL for the file. """ - self._url_container[external_id] = {"url": download_url, "expiry_time": time.time()} + self._url_container[external_id] = { + "url": download_url, + "expiry_time": time.time(), + } def get_url(self, external_id: str) -> Any: """Get download url from a cache if they are valid. diff --git a/cognite/cdffs/gcp_upload_strategy.py b/cognite/cdffs/gcp_upload_strategy.py index 8523650..af3fb4e 100644 --- a/cognite/cdffs/gcp_upload_strategy.py +++ b/cognite/cdffs/gcp_upload_strategy.py @@ -18,18 +18,24 @@ def __init__(self, metadata: FileMetadata, cognite_client: CogniteClient): self.chunk_cache: Dict = OrderedDict() # Store chunks in the order received self.last_written_index = -1 # The last consecutive chunk index that was written self.last_written_byte = -1 # The last byte position that was written + self.logger = logging.getLogger("cdffs.GoogleUploadStrategy") def _write_chunk(self, index: int, data: bytes) -> None: start_byte = self.last_written_byte + 1 end_byte = start_byte + len(data) - 1 - headers = {"Content-Length": str(len(data)), "Content-Range": f"bytes {start_byte}-{end_byte}/*"} + headers = { + "Content-Length": str(len(data)), + "Content-Range": f"bytes {start_byte}-{end_byte}/*", + } response = self.session.put(self.params["upload_url"], headers=headers, data=data) response.raise_for_status() self.indexes.append(index) - logging.info(f"Finished uploading chunk {index}. Took {response.elapsed.total_seconds()} sec") + self.logger.debug( + f"Finished uploading chunk {index}=[{start_byte}:{end_byte}]. Took {response.elapsed.total_seconds()} sec" + ) # Update the last written byte position self.last_written_byte = end_byte @@ -46,7 +52,7 @@ def upload_chunk(self, data: bytes, index: int) -> None: del self.chunk_cache[next_index] # Remove the written chunk from cache self.last_written_index = next_index - logging.info(f"Received chunk {index}. Cache size: {len(self.chunk_cache)} chunks") + self.logger.debug(f"Received chunk {index}. Cache size: {len(self.chunk_cache)} chunks") def merge_chunks(self) -> int: """Google Cloud Storage handles merging internally. So, this method is a no-op for the GCS strategy.""" diff --git a/cognite/cdffs/spec.py b/cognite/cdffs/spec.py index 6a5b4e8..ae0199b 100644 --- a/cognite/cdffs/spec.py +++ b/cognite/cdffs/spec.py @@ -2,6 +2,7 @@ import logging import time from concurrent.futures import ThreadPoolExecutor +from io import BytesIO from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union @@ -19,7 +20,7 @@ from fsspec.caching import AllBytes from fsspec.spec import AbstractBufferedFile from fsspec.utils import DEFAULT_BLOCK_SIZE -from retry import retry +from tenacity import after_log, retry, stop_after_attempt, wait_fixed from .az_upload_strategy import AzureUploadStrategy from .credentials import get_connection_config @@ -167,7 +168,7 @@ def split_path(self, path: str, validate_suffix: bool = True) -> Tuple[str, str, elif self._suffix_exists(path): external_id_prefix = [x for x in Path(path).parts if Path(x).suffix][0] root_dir = path[: path.find(external_id_prefix)].strip("/") - external_id = path[path.find(external_id_prefix) :] + external_id = path[path.find(external_id_prefix) :] # noqa elif Path(path).parts and not validate_suffix: external_id_prefix = "" @@ -187,7 +188,11 @@ def cache_path(self, root_dir: str, external_id: str, file_size: int) -> None: file_size (int): File size (in bytes). """ inp_path = Path(root_dir, external_id) - file_meta = {"type": "file", "name": str(inp_path).lstrip("/"), "size": file_size} + file_meta = { + "type": "file", + "name": str(inp_path).lstrip("/"), + "size": file_size, + } parent_path = str(inp_path.parent).lstrip("/") if parent_path not in self.dircache: @@ -272,7 +277,10 @@ def _ls(self, root_dir: str, external_id_prefix: str, limit: int = -1) -> None: if not file_met.external_id: # Files are expected to have a valid external id. continue - inp_path = Path(file_met.directory if file_met.directory else "/", file_met.external_id) + inp_path = Path( + file_met.directory if file_met.directory else "/", + file_met.external_id, + ) file_name = str(inp_path).lstrip("/") file_size = int(file_met.metadata.get("size", -1)) if file_met.metadata else -1 file_meta = {"type": "file", "name": file_name, "size": file_size} @@ -280,7 +288,10 @@ def _ls(self, root_dir: str, external_id_prefix: str, limit: int = -1) -> None: # Add directory information. parent_path = str(inp_path.parent).lstrip("/") if parent_path not in directories: - directories[parent_path] = {"type": "directory", "name": parent_path.lstrip("/")} + directories[parent_path] = { + "type": "directory", + "name": parent_path.lstrip("/"), + } if file_name not in _file_write_cache: if parent_path not in self.dircache: @@ -508,7 +519,10 @@ def open( ) def read_file( - self, external_id: str, start_byte: Union[int, None] = None, end_byte: Union[int, None] = None + self, + external_id: str, + start_byte: Union[int, None] = None, + end_byte: Union[int, None] = None, ) -> Any: """Open and read the contents of a file. @@ -547,7 +561,11 @@ def read_file( raise FileNotFoundError from cognite_exp def cat( - self, path: Union[str, list], recursive: bool = False, on_error: str = "raise", **kwargs: Optional[Any] + self, + path: Union[str, list], + recursive: bool = False, + on_error: str = "raise", + **kwargs: Optional[Any], ) -> Union[bytes, Any, Dict[str, bytes]]: """Open and read the contents of a file(s). @@ -669,6 +687,10 @@ def __init__( ) self.write_strategy: UploadStrategy + if mode == "wb": + self.buffer = BytesIO() + self.offset = None + self.buffered = False if fs.upload_strategy == "google" and mode != "rb": self.write_strategy = GoogleUploadStrategy(self.file_metadata, self.cognite_client) @@ -676,6 +698,7 @@ def __init__( self.write_strategy = AzureUploadStrategy(self.file_metadata, self.cognite_client) elif mode != "rb": self.write_strategy = InMemoryUploadStrategy(self.file_metadata, self.cognite_client) + self.buffered = True # InMemoryUpload requires all data to be cached until last chunk comes super().__init__( fs, @@ -686,7 +709,6 @@ def __init__( **kwargs, ) - @retry(exceptions=_COMMON_EXCEPTIONS, tries=2) def _upload_chunk(self, final: bool = False) -> bool: """Upload file contents to CDF. @@ -700,14 +722,19 @@ def _upload_chunk(self, final: bool = False) -> bool: RuntimeError: When an unexpected error occurred. """ - @retry(tries=3, logger=logging.getLogger("upload_chunk")) + @retry( + reraise=False, + stop=stop_after_attempt(5), + wait=wait_fixed(0.5), + after=after_log(logging.getLogger("cdffs"), logging.INFO), + ) def strategy_submit(data: bytes, index: int) -> None: self.write_strategy.upload_chunk(data, index) buffer_length = len(self.buffer.getvalue()) - blocks = [self.buffer.getvalue()[i : i + self.blocksize] for i in range(0, buffer_length, self.blocksize)] - - logging.info(f"{len(blocks)} full blocks discovered") + blocks = [ + self.buffer.getvalue()[i : i + self.blocksize] for i in range(0, buffer_length, self.blocksize) # noqa + ] # Upload blocks in parallel using ThreadPoolExecutor with ThreadPoolExecutor(max_workers=5) as executor: @@ -719,11 +746,28 @@ def strategy_submit(data: bytes, index: int) -> None: # If it's the final block, then send a merge request if final: - total_size = self.write_strategy.merge_chunks() - self.cognite_client.files.update( - item=FileMetadataUpdate(external_id=self.external_id).metadata.add({"size": total_size}) + @retry( + stop=stop_after_attempt(5), + wait=wait_fixed(0.5), + after=after_log(logging.getLogger("cdffs"), logging.INFO), ) + def safe_merge() -> int: + return self.write_strategy.merge_chunks() + + total_size = safe_merge() + + @retry( + stop=stop_after_attempt(5), + wait=wait_fixed(0.5), + after=after_log(logging.getLogger("cdffs"), logging.INFO), + ) + def safe_file_update(_size: int) -> None: + self.cognite_client.files.update( + item=FileMetadataUpdate(external_id=self.external_id).metadata.add({"size": _size}) + ) + + safe_file_update(total_size) self.fs.cache_path( self.root_dir, @@ -731,7 +775,7 @@ def strategy_submit(data: bytes, index: int) -> None: total_size, ) - return final + return final if self.buffered else True # tell fsspec to cache buffer or not def _fetch_range(self, start: int, end: int) -> Any: """Read file contents from CDF. diff --git a/poetry.lock b/poetry.lock index a2b77c2..2aff824 100644 --- a/poetry.lock +++ b/poetry.lock @@ -535,17 +535,6 @@ diagnostics = ["bokeh (>=2.4.2)", "jinja2 (>=2.10.3)"] distributed = ["distributed (==2023.10.0)"] test = ["pandas[test]", "pre-commit", "pytest", "pytest-cov", "pytest-rerunfailures", "pytest-timeout", "pytest-xdist"] -[[package]] -name = "decorator" -version = "5.1.1" -description = "Decorators for Humans" -optional = false -python-versions = ">=3.5" -files = [ - {file = "decorator-5.1.1-py3-none-any.whl", hash = "sha256:b8c3f85900b9dc423225913c5aace94729fe1fa9763b38939a95226f02d37186"}, - {file = "decorator-5.1.1.tar.gz", hash = "sha256:637996211036b6385ef91435e4fae22989472f9d571faba8927ba8253acbc330"}, -] - [[package]] name = "distlib" version = "0.3.7" @@ -1403,17 +1392,6 @@ files = [ {file = "protobuf-4.24.4.tar.gz", hash = "sha256:5a70731910cd9104762161719c3d883c960151eea077134458503723b60e3667"}, ] -[[package]] -name = "py" -version = "1.11.0" -description = "library with cross-python path, ini-parsing, io, code, log facilities" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" -files = [ - {file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"}, - {file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"}, -] - [[package]] name = "pyarrow" version = "13.0.0" @@ -1944,21 +1922,6 @@ urllib3 = ">=1.25.10,<3.0" [package.extras] tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli", "tomli-w", "types-requests"] -[[package]] -name = "retry" -version = "0.9.2" -description = "Easy to use retry decorator." -optional = false -python-versions = "*" -files = [ - {file = "retry-0.9.2-py2.py3-none-any.whl", hash = "sha256:ccddf89761fa2c726ab29391837d4327f819ea14d244c232a1d24c67a2f98606"}, - {file = "retry-0.9.2.tar.gz", hash = "sha256:f8bfa8b99b69c4506d6f5bd3b0aabf77f98cdb17f3c9fc3f5ca820033336fba4"}, -] - -[package.dependencies] -decorator = ">=3.4.2" -py = ">=1.4.26,<2.0.0" - [[package]] name = "rfc3986" version = "2.0.0" @@ -2284,6 +2247,20 @@ Sphinx = ">=5" lint = ["docutils-stubs", "flake8", "mypy"] test = ["pytest"] +[[package]] +name = "tenacity" +version = "8.2.3" +description = "Retry code until it succeeds" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tenacity-8.2.3-py3-none-any.whl", hash = "sha256:ce510e327a630c9e1beaf17d42e6ffacc88185044ad85cf74c0a8887c6a0f88c"}, + {file = "tenacity-8.2.3.tar.gz", hash = "sha256:5398ef0d78e63f40007c1fb4c0bff96e1911394d2fa8d194f77619c05ff6cc8a"}, +] + +[package.extras] +doc = ["reno", "sphinx", "tornado (>=4.5)"] + [[package]] name = "toml" version = "0.10.2" @@ -2364,17 +2341,6 @@ files = [ [package.dependencies] urllib3 = ">=2" -[[package]] -name = "types-retry" -version = "0.9.9.4" -description = "Typing stubs for retry" -optional = false -python-versions = "*" -files = [ - {file = "types-retry-0.9.9.4.tar.gz", hash = "sha256:e4731dc684b56b875d9746459ad665d3bc281a56b530acdf1c97730167799941"}, - {file = "types_retry-0.9.9.4-py3-none-any.whl", hash = "sha256:f29760a9fe8b1fefe253e5fe6be7e4c0eba243932c600e0eccffb42a21d17765"}, -] - [[package]] name = "typing-extensions" version = "4.8.0" @@ -2496,4 +2462,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.9.10,<3.13" -content-hash = "aff09ac278fe886d9a8dc5061cbe24a3beb8cbb6f1014aa6311269081f8f95be" +content-hash = "a208da8a88bd3c62e53ff1579396980abe9b788563dd6595d3f82f611b9422b0" diff --git a/pyproject.toml b/pyproject.toml index c0ab2bc..dba28f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cognite-cdffs" -version = "0.2.9" +version = "0.2.10" description = "File System Interface for CDF Files" license = "Apache-2.0" authors = ["Infant Alex "] @@ -75,7 +75,7 @@ twine = "^4.0.2" pydantic = "^2.4.2" python-dotenv = "^1.0.0" pydantic-settings = "^2.0.3" -retry = "^0.9.2" +tenacity = "^8.2.3" [tool.poetry.dev-dependencies]