From 59b6c7313c34fbd5efbd00a85cdfbedea466f981 Mon Sep 17 00:00:00 2001 From: Mitry Date: Thu, 12 Dec 2024 11:34:47 -0300 Subject: [PATCH] ref worker --- storage-app/src/shared/worker_services.py | 8 +++++--- storage-app/src/worker.py | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/storage-app/src/shared/worker_services.py b/storage-app/src/shared/worker_services.py index 231ed4c..2576b3c 100644 --- a/storage-app/src/shared/worker_services.py +++ b/storage-app/src/shared/worker_services.py @@ -41,7 +41,7 @@ def __init__( file_ids: list[str], task: Task ) -> None: - self.object_set = Bucket(bucket_name).get_download_objects(file_ids) + self.file_ids = file_ids self.bucket_name = bucket_name self._task = task @@ -49,10 +49,11 @@ def __init__( async def archive_objects(self) -> Optional[bool]: json_data: Any = ("annotation.json", dumps(self.annotation, indent=4).encode("utf-8")) + object_set = Bucket(self.bucket_name).get_download_objects(self.file_ids) queue = Queue() - producer = FileProducer(self.object_set, queue, MAX_CONCURENT) + producer = FileProducer(object_set, queue, MAX_CONCURENT) writer = ZipWriter(f"{self.bucket_name}_dataset") consumer = ZipConsumer(queue, [json_data], writer) @@ -68,11 +69,12 @@ async def archive_objects(self) -> Optional[bool]: wait_list = wait_list.next continue + print(f"ZIP WORK STALL, {producer.iter_count}") self._task.update_state(state="PROGRESS") await async_stall_for(5) await producer_task - await self.object_set.close() + await object_set.close() consumer.join() writer.join() diff --git a/storage-app/src/worker.py b/storage-app/src/worker.py index 7cc6bd8..4dd70e4 100644 --- a/storage-app/src/worker.py +++ b/storage-app/src/worker.py @@ -1,7 +1,7 @@ from celery import Celery from shared.settings import BROKER_URL, RESULT_URL, CELERY_CONFIG from shared.worker_services import Zipper, Hasher, EmbeddingStatus -from asyncio import get_event_loop +from asyncio import run from typing import Optional, Any from json import JSONEncoder, loads, dumps from kombu.serialization import register @@ -35,7 +35,7 @@ def default(self, o) -> Any: return getattr(o, "__json__", super().default)(o) ) def produce_download_task(self, bucket_name: str, file_ids: list[str]) -> str | None: task = Zipper(bucket_name, file_ids, self) - get_event_loop().run_until_complete(task.archive_objects()) + run(task.archive_objects()) return task.archive_id