From b32b5cf65c6c2228b6517768c813a5e4c2dee5d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Bournhonesque?= Date: Wed, 11 Dec 2024 12:34:12 +0100 Subject: [PATCH] fix: fix bug in directory selection for proof image (#616) --- open_prices/proofs/tests.py | 37 +++++++++++++++++++++++++++++++++- open_prices/proofs/utils.py | 40 +++++++++++++++++++++++++------------ 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/open_prices/proofs/tests.py b/open_prices/proofs/tests.py index 4a3829e1..efb25353 100644 --- a/open_prices/proofs/tests.py +++ b/open_prices/proofs/tests.py @@ -20,7 +20,7 @@ run_and_save_proof_prediction, ) from open_prices.proofs.models import Proof -from open_prices.proofs.utils import fetch_and_save_ocr_data +from open_prices.proofs.utils import fetch_and_save_ocr_data, select_proof_image_dir LOCATION_OSM_NODE_652825274 = { "type": location_constants.TYPE_OSM, @@ -473,3 +473,38 @@ def test_run_and_save_proof_prediction_proof(self): proof_type_prediction.delete() price_tag_prediction.delete() proof.delete() + + +class TestSelectProofImageDir(TestCase): + def test_select_proof_image_dir_no_dir(self): + with tempfile.TemporaryDirectory() as tmpdir: + images_dir = Path(tmpdir) / "images" + images_dir.mkdir() + selected_dir = select_proof_image_dir(images_dir) + self.assertEqual(selected_dir, images_dir / "0001") + + def test_select_proof_image_dir_existing_dir(self): + with tempfile.TemporaryDirectory() as tmpdir: + images_dir = Path(tmpdir) / "images" + images_dir.mkdir() + (images_dir / "0001").mkdir() + selected_dir = select_proof_image_dir(images_dir) + self.assertEqual(selected_dir, images_dir / "0001") + + def test_select_proof_image_dir_existing_dir_second_dir(self): + with tempfile.TemporaryDirectory() as tmpdir: + images_dir = Path(tmpdir) / "images" + images_dir.mkdir() + (images_dir / "0001").mkdir() + (images_dir / "0002").mkdir() + selected_dir = select_proof_image_dir(images_dir) + self.assertEqual(selected_dir, images_dir / "0002") + + def test_select_proof_image_dir_existing_dir_create_new_dir(self): + with tempfile.TemporaryDirectory() as tmpdir: + images_dir = Path(tmpdir) / "images" + images_dir.mkdir() + (images_dir / "0001").mkdir() + (images_dir / "0001" / "0001.jpg").touch() + selected_dir = select_proof_image_dir(images_dir, max_images_per_dir=1) + self.assertEqual(selected_dir, images_dir / "0002") diff --git a/open_prices/proofs/utils.py b/open_prices/proofs/utils.py index a1133b54..01b49d60 100644 --- a/open_prices/proofs/utils.py +++ b/open_prices/proofs/utils.py @@ -110,17 +110,7 @@ def store_file( # We store the images in directories containing up to 1000 images # Once we reach 1000 images, we create a new directory by increasing the directory ID # noqa # This is used to prevent the base image directory from containing too many files # noqa - images_dir = settings.IMAGES_DIR - current_dir_id = max( - (int(p.name) for p in images_dir.iterdir() if p.is_dir() and p.name.isdigit()), - default=1, - ) - current_dir_id_str = f"{current_dir_id:04d}" - current_dir = images_dir / current_dir_id_str - if current_dir.exists() and len(list(current_dir.iterdir())) >= 1_000: - # if the current directory contains 1000 images, we create a new one - current_dir_id += 1 - current_dir = images_dir / str(current_dir_id) + current_dir = select_proof_image_dir(settings.IMAGES_DIR) current_dir.mkdir(exist_ok=True, parents=True) file_full_path = generate_full_path(current_dir, file_stem, extension) # write the content of the file to the new file @@ -128,13 +118,37 @@ def store_file( f.write(file.file.read()) # create a thumbnail image_thumb_path = generate_thumbnail( - current_dir, current_dir_id_str, file_stem, extension, mimetype + current_dir, current_dir.name, file_stem, extension, mimetype ) # Build file_path - file_path = generate_relative_path(current_dir_id_str, file_stem, extension) + file_path = generate_relative_path(current_dir.name, file_stem, extension) return (file_path, mimetype, image_thumb_path) +def select_proof_image_dir(images_dir: Path, max_images_per_dir: int = 1_000) -> Path: + """ "Select the directory where to store the image. + + We create a new directory when the current one contains more than 1000 + images. The directories are named with a 4-digit number, starting at 0001. + + :param images_dir: the directory where the images are stored + :param max_images_per_dir: the maximum number of images per directory + :return: the selected directory + """ + current_dir_id = max( + (int(p.name) for p in images_dir.iterdir() if p.is_dir() and p.name.isdigit()), + default=1, + ) + current_dir_id_str = f"{current_dir_id:04d}" + current_dir = images_dir / current_dir_id_str + if current_dir.exists() and len(list(current_dir.iterdir())) >= max_images_per_dir: + # if the current directory contains 1000 images, we create a new one + current_dir_id += 1 + current_dir_id_str = f"{current_dir_id:04d}" + current_dir = images_dir / current_dir_id_str + return current_dir + + def run_ocr_on_image(image_path: Path | str, api_key: str) -> dict[str, Any] | None: """Run Google Cloud Vision OCR on the image stored at the given path.