Skip to content

Commit

Permalink
Merge pull request #20 from meteoblue/19-filecache-rename-sometimes-r…
Browse files Browse the repository at this point in the history
…eturns-a-filenotfounderror

19 filecache rename sometimes returns a filenotfounderror
  • Loading branch information
nicoladd authored Aug 26, 2024
2 parents da5da4d + 7df8830 commit 6ef8131
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 22 deletions.
3 changes: 2 additions & 1 deletion meteoblue_dataset_sdk/caching/abstractcache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from typing import Optional


class AbstractCache(ABC):
Expand All @@ -18,7 +19,7 @@ async def set(self, key: str, value: bytes) -> None:
raise NotImplementedError("Method must be implemented")

@abstractmethod
async def get(self, key: str) -> bytes:
async def get(self, key: str) -> Optional[bytes]:
"""
Retrieve a value from the cache
:param key: Key used to store the query to retrieve.
Expand Down
34 changes: 22 additions & 12 deletions meteoblue_dataset_sdk/caching/filecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import tempfile
import zlib
from pathlib import Path
from random import randrange
from typing import Optional

import aiofiles
import aiofiles.os
import aiofiles # type: ignore
import aiofiles.os # type: ignore

from .abstractcache import AbstractCache

Expand Down Expand Up @@ -53,33 +54,42 @@ async def set(self, key: str, value: bytes) -> None:
cache_file_path = self._hash_to_path(key)

try:
await aiofiles.os.stat(cache_file_path.parent)
await aiofiles.os.stat(cache_file_path.parent) # type: ignore
except FileNotFoundError:
await aiofiles.os.mkdir(cache_file_path.parent)
await aiofiles.os.mkdir(cache_file_path.parent) # type: ignore

if await self._is_cached_file_valid(cache_file_path):
return
temp_file_path = f"{cache_file_path}~"

# add random number to temporary file name to mitigate possible race conditions
# from multiple processes writing to exactly the same file
random_number = randrange(1000000000)
temp_file_path = f"{cache_file_path}_{random_number}~"
async with aiofiles.open(temp_file_path, "wb") as file:
await file.write(zlib.compress(value, self.compression_level))
await aiofiles.os.rename(temp_file_path, cache_file_path)

try:
await aiofiles.os.rename(temp_file_path, cache_file_path)
except FileNotFoundError:
# do not raise error, just continue
return

async def get(self, key: str) -> Optional[bytes]:
if not key:
return
return # type: ignore
file_path = self._hash_to_path(key)

if not await self._is_cached_file_valid(file_path):
return
return # type: ignore
try:
async with aiofiles.open(file_path, "rb") as f:
return zlib.decompress(await f.read())
except (OSError, IOError) as e:
logging.error(f"error while reading the file {file_path}", e)
return
return # type: ignore

async def _is_cached_file_valid(self, file_path: Path) -> bool:
if not file_path.exists():
async def _is_cached_file_valid(self, file_path: Optional[Path]) -> bool:
if file_path is None or not file_path.exists():
return False
stats = await aiofiles.os.stat(file_path)
file_modification_timestamp = int(stats.st_mtime)
Expand All @@ -95,5 +105,5 @@ def _hash_to_path(self, key_hash: str) -> Optional[Path]:
as the filename.
"""
if not key_hash:
return
return # type: ignore
return Path(self.cache_path, key_hash[:3], key_hash[3:])
17 changes: 9 additions & 8 deletions meteoblue_dataset_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@
meteoblue dataset client
"""

import aiohttp
import asyncio
import copy
import hashlib
import json
import logging
from contextlib import asynccontextmanager
from typing import Optional

import aiohttp

from .protobuf.dataset_pb2 import DatasetApiProtobuf
from .protobuf.measurements_pb2 import MeasurementApiProtobuf
from .caching.filecache import FileCache
from .protobuf.dataset_pb2 import DatasetApiProtobuf # type: ignore
from .protobuf.measurements_pb2 import MeasurementApiProtobuf # type: ignore
from .utils import run_async


Expand Down Expand Up @@ -53,7 +54,7 @@ def __init__(self, message):


class Client(object):
def __init__(self, apikey: str, cache=None):
def __init__(self, apikey: str, cache: Optional[FileCache] = None):
self._config = ClientConfig(apikey)
self.cache = cache

Expand All @@ -63,8 +64,8 @@ async def _fetch(
session: aiohttp.ClientSession,
method: str,
url: str,
body_dict: dict = None,
query_params: dict = None,
body_dict: Optional[dict] = None,
query_params: Optional[dict] = None,
):
"""
Fetch data from an URL and try for error 5xx or timeouts.
Expand Down Expand Up @@ -210,7 +211,7 @@ async def query(self, params: dict):
params = copy.copy(params)
params["format"] = "protobuf"
cache_key = ""
if self.cache:
if self.cache is not None:
cache_key = self._hash_params(params)
cached_query_results = await self.cache.get(cache_key)
if cached_query_results:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from setuptools import setup
from setuptools import setup # type: ignore

setup(
use_scm_version=True,
Expand Down

0 comments on commit 6ef8131

Please sign in to comment.