Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor duplicate task #86

Merged
merged 1 commit into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion storage-app/src/router/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def upload_file(
file_id, _status = await project_bucket.put_object(file, file_meta)

if _status == status.HTTP_201_CREATED:
produce_handle_media_task.delay(bucket_name, file_id)
produce_handle_media_task.apply_async((bucket_name, file_id), countdown=10)

return JSONResponse(status_code=_status, content={"result": file_id})

Expand Down
14 changes: 10 additions & 4 deletions storage-app/src/shared/worker_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@


class EmbeddingStatus(Enum):
DUPLICATE = "v"
DUPLICATE = "u"
VALIDATION = "v"
REBOUND = "r"

def __json__(self, *args, **kwargs): return self.name

def to_value(self):
match self.value:
case "u": return "v"
case _: return self.value


class Zipper:
written: bool = False
Expand Down Expand Up @@ -160,9 +165,9 @@ def search_similar(self):
with EmbeddingStorage() as storage:
self.result = storage.search(self.embedding, self.project_id)
self.status = (
EmbeddingStatus.VALIDATION
if not self.result else
EmbeddingStatus.DUPLICATE
if len(self.result)
else EmbeddingStatus.VALIDATION
)

def handle_search_result(self):
Expand All @@ -174,6 +179,7 @@ def handle_search_result(self):

with EmbeddingStorage() as storage:
if self.status == EmbeddingStatus.DUPLICATE:
print("HERE")
rmi: Optional[str] = None
rmd: float = float("inf")

Expand Down Expand Up @@ -216,7 +222,7 @@ def send_update(
payload_token = emit_token({"minutes": 1}, SECRET_KEY, SECRET_ALGO)
payload = {}

if status: payload["status"] = status.value
if status: payload["status"] = status.to_value()
if rebound: payload["rebound"] = rebound

if not status and not rebound: return
Expand Down
Loading