Skip to content

Commit

Permalink
feat: continue export in a new job after minutes-limit is reached
Browse files Browse the repository at this point in the history
  • Loading branch information
shadinaif committed Jan 22, 2025
1 parent 8cb5115 commit e2dd50e
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 21 deletions.
116 changes: 98 additions & 18 deletions futurex_openedx_extensions/helpers/export_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import tempfile
from datetime import datetime
from typing import Any, Generator, Optional, Tuple
from urllib.parse import urlencode, urlparse

Expand Down Expand Up @@ -93,32 +94,47 @@ def _paginated_response_generator(
fx_info: dict, view_data: dict, view_instance: Any
) -> Generator:
"""Generator to yield paginated responses."""
url = view_data['url']
page = view_data['start_page']
kwargs = view_data.get('kwargs', {})
processed_records = 0
while url:
processed_records = (page - 1) * view_data['page_size']
url = f'{view_data["url"]}&page={page}'
start_time = datetime.now()
while url and not view_data['end_page']:
mocked_request = _get_mocked_request(url, fx_info)
response = view_instance(mocked_request, **kwargs)
data, total_records = _get_response_data(response)
processed_records += len(data)

progress = round(processed_records / total_records, 2) if total_records else 0
yield data, progress, processed_records
if (datetime.now() - start_time).total_seconds() > settings.FX_TASK_MINUTES_LIMIT * 60:
view_data['end_page'] = page
page += 1
url = response.data.get('next')


def _get_storage_dir(dir_name: str) -> str:
"""Return storgae dir"""
"""Return storage dir"""
return os.path.join(settings.FX_DASHBOARD_STORAGE_DIR, f'{str(dir_name)}/exported_files',)


def _upload_file_to_storage(local_file_path: str, filename: str, tenant_id: int) -> str:
def _upload_file_to_storage(local_file_path: str, filename: str, tenant_id: int, partial: int = 0) -> str:
"""
Upload a file to the default storage (e.g., S3).
:param local_file_path: Path to the local file to upload
:type local_file_path: str
:param filename: filename for generated CSV
:type filename: str
:param tenant_id: The tenant ID.
:type tenant_id: int
:param partial: The partial file number.
:type partial: int
:return: The path of the uploaded file
:rtype: str
"""
if partial:
filename = f'{filename}_parts/{filename}_{partial:06d}'
storage_path = os.path.join(_get_storage_dir(str(tenant_id)), filename)
with open(local_file_path, 'rb') as file:
content_file = ContentFile(file.read())
Expand All @@ -130,11 +146,53 @@ def _upload_file_to_storage(local_file_path: str, filename: str, tenant_id: int)
return storage_path


def _combine_partial_files(task_id: int, filename: str, tenant_id: int) -> None:
"""
Combine partial files into a single file.
:param task_id: The task ID.
:type task_id: int
:param filename: The filename of the partial files.
:type filename: str
:param tenant_id: The tenant ID.
:type tenant_id: int
"""
storage_dir = _get_storage_dir(str(tenant_id))
parts_dir = os.path.join(storage_dir, f'{filename}_parts')
partial_files = default_storage.listdir(parts_dir)[1]

log.info('CSV Export: combining partial files for task %s...', task_id)
try:
with tempfile.NamedTemporaryFile(mode='w', newline='', encoding='utf-8', delete=False) as tmp_file:
for partial_file in sorted(partial_files):
with default_storage.open(os.path.join(parts_dir, partial_file)) as file:
tmp_file.write(file.read().decode())
log.info('CSV Export: uploading combined file for task %s...', task_id)
_upload_file_to_storage(
tmp_file.name, filename, DataExportTask.get_task(task_id=task_id).tenant.id,
)
log.info('CSV Export: file uploaded successfully for task %s...', task_id)
log.info('CSV Export: deleting partial files for task %s...', task_id)
for partial_file in partial_files:
default_storage.delete(os.path.join(parts_dir, partial_file))
log.info('CSV Export: deleting partial files directory for task %s...', task_id)
default_storage.delete(parts_dir)
log.info('CSV Export: partial files directory deleted successfully for task %s...', task_id)

finally:
try:
os.remove(tmp_file.name)
log.info('CSV Export: temporary combined file removed for task %s...', task_id)
except Exception as exc:
log.info('CSV Export: failed to remove temporary combined file for task %s: %s', task_id, str(exc))


def _generate_csv_with_tracked_progress(
task_id: int, fx_permission_info: dict, view_data: dict, filename: str, view_instance: Any
) -> str:
) -> bool:
"""
Generate response with progress and Write data to a CSV file.
:param task_id: task id will be used to update progress
:type task_id: int
:param fx_permission_info: contains role and permission info
Expand All @@ -145,11 +203,12 @@ def _generate_csv_with_tracked_progress(
:type filename: str
:param view_instance: view instance
:type view_instance: Any
:return: return default storage file path
:return: True if export fully completed, False if partially completed
:rtype: bool
"""
fully_completed = True
page_size = view_data['page_size']
storage_path = None
batch_count = 0
batch_count = view_data.get('processed_batches', 0)
try:
with tempfile.NamedTemporaryFile(mode='w', newline='', encoding='utf-8', delete=False) as tmp_file:
for data, progress, processed_records in _paginated_response_generator(
Expand All @@ -161,7 +220,7 @@ def _generate_csv_with_tracked_progress(
batch_count,
processed_records,
task_id,
progress * 100,
round(progress * 100, 2),
)
if data:
log.info('CSV Export: writing batch %s of task %s...', batch_count, task_id)
Expand All @@ -177,34 +236,55 @@ def _generate_csv_with_tracked_progress(
else:
log.warning('CSV Export: batch %s of task %s is empty!', batch_count, task_id)

log.info('CSV Export: uploading generated file for task %s...', task_id)
storage_path = _upload_file_to_storage(
tmp_file.name, filename, DataExportTask.get_task(task_id=task_id).tenant.id,
view_data['processed_batches'] = batch_count

if view_data['end_page']:
log.info('CSV Export: uploading partial file for task %s.', task_id)
else:
log.info('CSV Export: uploading file for task %s (no partial files).', task_id)
_upload_file_to_storage(
tmp_file.name, filename, DataExportTask.get_task(task_id=task_id).tenant.id, view_data['end_page'] or 0,
)
log.info('CSV Export: file uploaded successfully for task %s...', task_id)

if view_data['start_page'] == 1 and view_data['end_page'] is None:
log.info('CSV Export: file uploaded successfully for task %s (no partial files).', task_id)
elif view_data['end_page']:
log.info('CSV Export: partial file uploaded successfully for task %s.', task_id)
view_data['start_page'] = view_data['end_page'] + 1
view_data['end_page'] = None
fully_completed = False
else:
_combine_partial_files(task_id, filename, DataExportTask.get_task(task_id=task_id).tenant.id)

finally:
try:
os.remove(tmp_file.name)
log.info('CSV Export: temporary file removed for task %s...', task_id)
except Exception as exc:
log.info('CSV Export: failed to remove temporary file for task %s: %s', task_id, str(exc))
return storage_path

return fully_completed


def export_data_to_csv(
task_id: int, url: str, view_data: dict, fx_permission_info: dict, filename: str
) -> str:
) -> bool:
"""
Mock view with given view params and write JSON response to CSV
:param task_id: task id will be used to update progress
:type task_id: int
:param url: view url will be used to mock view and get response
:type url: str
:param view_data: required data for mocking
:type view_data: dict
:param fx_permission_info: contains role and permission info
:type fx_permission_info: dict
:param filename: filename for generated CSV
:type filename: str
:return: generated filename
:return: True if export fully completed, False if partially completed
:rtype: bool
"""
if urlparse(url).query:
raise FXCodedException(
Expand All @@ -229,7 +309,7 @@ def export_data_to_csv(
page_size = view_pagination_class.max_page_size or page_size

query_params['page_size'] = page_size
url_with_query_str = f'{url}?{urlencode(query_params)}' if query_params else url
url_with_query_str = f'{url}?{urlencode(query_params)}'

# Ensure the filename ends with .csv
if not filename.endswith('.csv'):
Expand Down
2 changes: 2 additions & 0 deletions futurex_openedx_extensions/helpers/export_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def generate_csv_url_response(self) -> dict:
'query_params': filtered_query_params,
'kwargs': self.kwargs, # type: ignore[attr-defined]
'path': self.request.path, # type: ignore[attr-defined]
'start_page': 1,
'end_page': None,
}
tenant_id = self.request.fx_permission_info[ # type: ignore[attr-defined]
'view_allowed_tenant_ids_any_access'
Expand Down
1 change: 1 addition & 0 deletions futurex_openedx_extensions/helpers/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ def set_status(cls, task_id: int, status: str, error_message: str = None) -> Non
fx_task.started_at = timezone.now()
if status == cls.STATUS_COMPLETED:
fx_task.completed_at = timezone.now()
fx_task.progress = 1.0
fx_task.save()

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ def plugin_settings(settings: Any) -> None:
'FX_DEFAULT_COURSE_EFFORT',
12,
)

# Default Course Effort
settings.FX_TASK_MINUTES_LIMIT = getattr(
settings,
'FX_TASK_MINUTES_LIMIT',
15,
)
13 changes: 10 additions & 3 deletions futurex_openedx_extensions/helpers/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""FX Helpers celery tasks"""
import copy
import logging

from celery import shared_task
Expand All @@ -21,9 +22,15 @@ def export_data_to_csv_task(
try:
_ = DataExportTask.get_task(fx_task_id)

export_data_to_csv(fx_task_id, url, view_data, fx_permission_info, filename)

DataExportTask.set_status(task_id=fx_task_id, status=DataExportTask.STATUS_COMPLETED)
if export_data_to_csv(fx_task_id, url, view_data, copy.deepcopy(fx_permission_info), filename):
DataExportTask.set_status(task_id=fx_task_id, status=DataExportTask.STATUS_COMPLETED)
else:
log.info(
'CSV Export: initiating a continue job starting from page %s for task %s.',
view_data['start_page'],
fx_task_id,
)
export_data_to_csv_task.delay(fx_task_id, url, view_data, fx_permission_info, filename)

except FXCodedException as exc:
if exc.code == FXExceptionCodes.EXPORT_CSV_TASK_NOT_FOUND.value:
Expand Down
1 change: 1 addition & 0 deletions test_utils/test_settings_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def root(*args):
FX_CACHE_TIMEOUT_COURSE_ACCESS_ROLES = 60 * 31 # 31 minutes
FX_CACHE_TIMEOUT_TENANTS_INFO = 60 * 60 * 3 # 3 hours
FX_CACHE_TIMEOUT_VIEW_ROLES = 60 * 31 # 31 minutes
FX_TASK_MINUTES_LIMIT = 16 # 16 minutes

FX_CLICKHOUSE_USER = 'dummy_test_user'
FX_CLICKHOUSE_PASSWORD = 'dummy_test_password'
Expand Down
1 change: 1 addition & 0 deletions tests/test_helpers/test_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
('FX_CACHE_TIMEOUT_VIEW_ROLES', 60 * 30), # 30 minutes
('FX_DASHBOARD_STORAGE_DIR', 'fx_dashboard'), # fx_dashboard
('FX_DEFAULT_COURSE_EFFORT', 12), # 12 hours
('FX_TASK_MINUTES_LIMIT', 15), # 15 minutes
]


Expand Down

0 comments on commit e2dd50e

Please sign in to comment.