Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
githubering182 committed Dec 17, 2024
2 parents 24aab0c + 20c7d8a commit f893a9d
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 58 deletions.
1 change: 1 addition & 0 deletions backend-app/file/file_tests/serializers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_file_serializer(self):
set(data.keys()),
data_keys.union({
"rebound",
"rebound_project",
"related_duplicates",
'upload_date',
"update_date",
Expand Down
3 changes: 3 additions & 0 deletions backend-app/file/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Model,
CharField,
DateTimeField,
IntegerField,
BooleanField,
ForeignKey,
DO_NOTHING,
Expand Down Expand Up @@ -30,6 +31,8 @@ class File(Model):
project: ForeignKey = ForeignKey("project.Project", on_delete=DO_NOTHING)
author: ForeignKey = ForeignKey("user.CustomUser", on_delete=DO_NOTHING)
rebound: ForeignKey = ForeignKey("self", on_delete=DO_NOTHING, null=True)
# todo: temp to remap storge bucket on file migration from project to projcet
rebound_project = IntegerField(null=True)
validator: ForeignKey = ForeignKey(
"user.CustomUser",
null=True,
Expand Down
4 changes: 2 additions & 2 deletions frontend-app/src/components/ui/FileMedia/index.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ function FileMedia({ files, slide, pathID }, ref) {
const setFile = (token) => {
resetZoom();
if (!files[slide]) return;
var { id, file_type, rebound } = files[slide];
var { id, file_type, rebound, rebound_project } = files[slide];
setTypeVideo(file_type === 'video');

var baseUrl = `${getOriginDomain()}:9000/api/storage`;
var queryUrl = `project_${pathID}/${rebound || id}/`;
var queryUrl = `project_${rebound_project || pathID}/${rebound || id}/`;

setFileUrl(`${baseUrl}/${queryUrl}?access=${token || tempFileToken}`);
setMark(rebound ? "DUPLICATE" : "");
Expand Down
33 changes: 33 additions & 0 deletions scripts/file_project_rebound.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from django.db import connection, transaction
from file.models import File


def migrate_files(
target_project: int,
dest_project: int,
id_mapping: list[tuple[int, int]]
):
with transaction.atomic():
files = File.objects.filter(project_id=target_project)

assert files.update(project_id=dest_project, rebound_project=target_project), "No File to update"

update_attributes_query = """
update attribute_group_attribute
set attribute_id = %s
where attributegroup_id in %s and attribute_id = %s;
"""

ag_id_list = tuple([
str(uid) for uid
in files.values_list("attributegroup__uid", flat=True).distinct()
])

query_values = [
(dest_id, ag_id_list, target_id)
for target_id, dest_id
in id_mapping
]

with connection.cursor() as cursor:
cursor.executemany(update_attributes_query, query_values)
177 changes: 172 additions & 5 deletions storage-app/src/shared/archive_helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from asyncio import create_task, wait, FIRST_COMPLETED, gather
from asyncio import FIRST_COMPLETED, create_task, wait
from typing import Optional
from threading import Thread
from queue import Queue
from gridfs import GridOut, GridIn
from gridfs.synchronous.grid_file import Cursor
from shared.storage_db import SyncDataBase
from motor.core import AgnosticBaseCursor
from gc import collect as gc_collect
Expand Down Expand Up @@ -67,9 +68,9 @@ def run(self):

buffer, read_size = task
buffer.seek(0)
v = buffer.read(read_size)
data = buffer.read(read_size)

dest.write(v)
dest.write(data)

del buffer

Expand Down Expand Up @@ -230,7 +231,7 @@ def run(self):
class FileProducer:
def __init__(
self,
object_set: AgnosticBaseCursor,
object_set: AgnosticBaseCursor | Cursor,
queue: Queue,
max_concurrent: int
):
Expand All @@ -243,6 +244,16 @@ def __init__(
@property
def ready(self) -> bool: return self._done

def produce_sync(self):
for file in self.object_set:
self.queue.put((self._get_file_name(file), file.read()))
self.iter_count += 1
file.close()
if not self.iter_count % GC_FREQ: gc_collect()

self.queue.put(None)
self._done = True

async def produce(self):
tasks = []

Expand All @@ -257,7 +268,7 @@ async def produce(self):
self.iter_count += 1
if not self.iter_count % GC_FREQ: gc_collect()

await gather(*tasks)
for task in tasks: await task
self.queue.put(None)

self._done = True
Expand All @@ -273,3 +284,159 @@ def _get_file_name(self, file: GridOut) -> str:
if extension: name += f".{extension}"

return name


class SyncZipping():
DUMP_THRESHOLD: int = 10 << 20

def __init__(
self,
dest_name: str,
object_set: Cursor,
additional: list[tuple[str, bytes]]
):
self.object_set = object_set
self.additional = additional
self.file_list = []
self._local_dir_end = 0
self._archive_id = None
self.dest = SyncDataBase \
.get_fs_bucket(TEMP_BUCKET) \
.open_upload_stream(
dest_name,
metadata={"created_at": datetime.now().isoformat()}
)

def dest_write(self, buffer, read_size):
buffer.seek(0)
data = buffer.read(read_size)
self.dest.write(data)

def tell(self) -> int: return self._local_dir_end

def result(self) -> Optional[str]: return self._archive_id

def _dump_buffer(self, buffer: BytesIO, zip_buffer: ZipFile):
dest_offset = self.tell()

new_list = zip_buffer.filelist
for zinfo in new_list: zinfo.header_offset += dest_offset

self.file_list += new_list
self._local_dir_end += buffer.tell()

self.dest_write(buffer, buffer.tell())

zip_buffer.close()

def _finalize(self):
self._write_end_record(end_buffer := BytesIO())
self.dest_write(end_buffer, end_buffer.tell())

self._write_cent_dir(
self.tell() + end_buffer.tell(),
self.tell(),
len(self.file_list),
cent_dir_buffer := BytesIO()
)
self.dest_write(cent_dir_buffer, cent_dir_buffer.tell())

self._archive_id = self.dest._id

self.dest.close()

SyncDataBase.close_connection()

def _write_end_record(self, buffer: BytesIO):
for zinfo in self.file_list:
dt = zinfo.date_time

dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
extra = []

assert zinfo.file_size <= ZIP64_LIMIT and zinfo.compress_size <= ZIP64_LIMIT

file_size = zinfo.file_size
compress_size = zinfo.compress_size

if zinfo.header_offset > ZIP64_LIMIT:
extra.append(zinfo.header_offset)
header_offset = 0xffffffff
else: header_offset = zinfo.header_offset

extra_data = zinfo.extra
min_version = 0

if extra:
extra_data = _Extra.strip(extra_data, (1,))
extra_data = pack_data("<HH" + "Q" * len(extra), 1, 8 * len(extra), *extra) + extra_data

min_version = ZIP64_VERSION

extract_version = max(min_version, zinfo.extract_version)
create_version = max(min_version, zinfo.create_version)

filename, flag_bits = zinfo._encodeFilenameFlags()

centdir = pack_data(
CENTRAL_STRUCT,
CENTRAL_STRING,
create_version,
zinfo.create_system,
extract_version,
zinfo.reserved,
flag_bits,
zinfo.compress_type,
dostime,
dosdate,
zinfo.CRC,
compress_size,
file_size,
len(filename),
len(extra_data),
len(zinfo.comment),
0,
zinfo.internal_attr,
zinfo.external_attr,
header_offset
)

buffer.write(centdir + filename + extra_data + zinfo.comment)

def _write_cent_dir(self, pos: int, start_dir: int, d_size: int, buffer: BytesIO):
cent_dir = pos - start_dir

if d_size > ZIP_FILECOUNT_LIMIT or pos > ZIP64_LIMIT:
pack = (END_64_STRUCT, END_64_STRING, 44, 45, 45, 0, 0, d_size, d_size, 0, pos)
buffer.write(pack_data(*pack))
buffer.write(pack_data(END_64_STRUCT_LOC, END_64_STRING_LOC, 0, pos, 1))
cent_dir = min(cent_dir, 0xFFFFFFFF)
start_dir = min(start_dir, 0xFFFFFFFF)
d_size = min(d_size, 0xFFFF)

buffer.write(pack_data(END_STRUCT, END_STRING, 0, 0, d_size, d_size, cent_dir, start_dir, 0))

def run(self):
buffer = BytesIO()
zip_buffer: ZipFile = ZipFile(buffer, "w", ZIP_DEFLATED)

for file in self.object_set:
f_name, ext = str(file._id), file.metadata.get("file_extension", "")
if ext: f_name += f".{ext}"

f_data = file.read()

zip_buffer.writestr(f_name, f_data)

if buffer.tell() > self.DUMP_THRESHOLD:
self._dump_buffer(buffer, zip_buffer)
buffer = BytesIO()
zip_buffer = ZipFile(buffer, "w", ZIP_DEFLATED)

for f_name, f_data in self.additional: zip_buffer.writestr(f_name, f_data)

if buffer.tell(): self._dump_buffer(buffer, zip_buffer)

self.object_set.close()
self._finalize()
30 changes: 4 additions & 26 deletions storage-app/src/shared/hasher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from asyncio.events import new_event_loop
from typing_extensions import Coroutine
from videohash import VideoHash
from videohash.utils import (
create_and_return_temporary_directory as mk_temp_dir,
Expand All @@ -9,29 +7,15 @@
from PIL.ImageFile import ImageFile
from os.path import join, sep, exists
from pathlib import Path
from asyncio import get_event_loop, set_event_loop
from motor.motor_asyncio import AsyncIOMotorGridOut
from shared.settings import HASH_SIZE, TEMP_HASH_PATH, MEDIA_SIZE
from numpy import asarray, float32, ndarray
from io import BytesIO
from scipy.fftpack import dct
from typing import Any

Image.ANTIALIAS = Image.Resampling.LANCZOS


def run_with_loop(f: Coroutine) -> Any:
current_loop = get_event_loop()
new_loop = new_event_loop()

try:
set_event_loop(new_loop)
return new_loop.run_until_complete(f)
finally:
new_loop.close()
set_event_loop(current_loop)


def to_embedding(
image: ImageFile,
embedding_type="dctlowfreq",
Expand Down Expand Up @@ -61,13 +45,10 @@ def __init__(self, file: AsyncIOMotorGridOut):
self.embedding = self._get_hash()

def _get_hash(self) -> ndarray:
get_event_loop().run_until_complete(self._get_buffer())
image = Image.open(self._buffer)
return to_embedding(image)

async def _get_buffer(self):
self._file.seek(0)
self._buffer = BytesIO(await self._file.read())
buffer = BytesIO(self._file.read())
image = Image.open(buffer)
return to_embedding(image)


class VHash(VideoHash):
Expand Down Expand Up @@ -118,9 +99,6 @@ def _copy_video_to_video_dir(self):
extension = self._file.metadata.get("file_extension")
self.video_path = join(self.video_dir, f"video.{extension}")

get_event_loop().run_until_complete(self._write_file())

async def _write_file(self):
with open(self.video_path, "wb") as file:
self._file.seek(0)
file.write(await self._file.read())
file.write(self._file.read())
Loading

0 comments on commit f893a9d

Please sign in to comment.