From 30f9038e11a641f2f44f2ccbf315edfc6b3573f5 Mon Sep 17 00:00:00 2001 From: Steven Gerrits Date: Fri, 3 Jan 2025 17:26:53 +0000 Subject: [PATCH] export and filter --- src/components/FilterComponent.vue | 10 +- vespadb/observations/tasks/generate_export.py | 93 ++++++++++++------- vespadb/observations/views.py | 17 +++- 3 files changed, 77 insertions(+), 43 deletions(-) diff --git a/src/components/FilterComponent.vue b/src/components/FilterComponent.vue index 1903c45..8b4761a 100644 --- a/src/components/FilterComponent.vue +++ b/src/components/FilterComponent.vue @@ -98,7 +98,7 @@ export default { { name: 'Zichtbaar', value: true }, { name: 'Niet zichtbaar', value: false } ]); - const minDate = ref(new Date(new Date().getFullYear(), 3, 1)); + const minDate = ref(new Date(2024, 3, 1)); const maxDate = ref(null); const selectedObservationStart = ref(false); const selectedObservationEnd = ref(false); @@ -121,7 +121,7 @@ export default { max_observation_date: maxDateCET, visible: visibleActief.value }); - + }, 300); const toggleMenu1 = () => { @@ -156,11 +156,11 @@ export default { watch([selectedMunicipalities, selectedProvinces, selectedNestType, selectedNestStatus, anbAreasActief, selectedObservationStart, selectedObservationEnd, visibleActief], () => { emitFilterUpdate(); - }, { deep: true}); - + }, { deep: true }); + watch(() => vespaStore.filters, (newFilters, oldFilters) => { const hasChanged = JSON.stringify(newFilters) !== JSON.stringify(oldFilters); - + if (hasChanged) { selectedMunicipalities.value = newFilters.municipalities || []; selectedProvinces.value = newFilters.provinces || []; diff --git a/vespadb/observations/tasks/generate_export.py b/vespadb/observations/tasks/generate_export.py index 21aef1f..02ce2d3 100644 --- a/vespadb/observations/tasks/generate_export.py +++ b/vespadb/observations/tasks/generate_export.py @@ -128,49 +128,54 @@ def generate_rows(queryset, is_admin: bool, user_municipality_ids: set) -> Itera acks_late=True ) def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[int] = None) -> Dict[str, Any]: - """ - Generate CSV export of observations based on filters. - - Args: - export_id: ID of the Export record - filters: Dictionary of filters to apply to the queryset - user_id: Optional ID of the user requesting the export - - Returns: - Dictionary containing export status and details - """ - logger.info(f"Starting export {export_id} for user {user_id}") + """Generate CSV export of observations based on filters.""" + logger.info(f"Starting export {export_id} for user {user_id} with filters: {filters}") export = Export.objects.get(id=export_id) try: # Update export status export.status = 'processing' export.save() - logger.info(f"Export {export_id} status set to processing") - # Validate and preprocess filters + # Clean and validate filters before applying valid_fields = {field.name: field for field in Observation._meta.get_fields()} processed_filters = {} + + # Log the incoming filters + logger.info(f"Processing filters: {filters}") + for key, value in filters.items(): + # Skip pagination and ordering parameters + if key in ['page', 'page_size', 'ordering']: + continue + if key in valid_fields: field = valid_fields[key] - if isinstance(field, models.BooleanField): - try: + try: + if isinstance(field, models.BooleanField): processed_filters[key] = parse_boolean(value) - except ValueError: - logger.error(f"Invalid boolean value for filter {key}: {value}") - continue - else: - processed_filters[key] = value + elif value: # Only add non-empty values + processed_filters[key] = value + except ValueError as e: + logger.warning(f"Skipping invalid filter {key}: {value}, error: {e}") + continue + + logger.info(f"Processed filters: {processed_filters}") - # Prepare queryset with optimizations - queryset = (Observation.objects - .filter(**processed_filters) + # Apply filters and get initial count + queryset = Observation.objects.filter(**processed_filters) + initial_count = queryset.count() + logger.info(f"Initial queryset count: {initial_count}") + + # Add optimizations + queryset = (queryset .select_related('province', 'municipality', 'reserved_by') .order_by('id')) - total = queryset.count() + # Process in batches + batch_size = 1000 processed = 0 + rows = [CSV_HEADERS] # Start with headers is_admin = False user_municipality_ids = set() @@ -180,16 +185,35 @@ def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[i is_admin = user.is_superuser user_municipality_ids = set(user.municipalities.values_list('id', flat=True)) except User.DoesNotExist: - pass - - logger.info(f"Processing {total} observations for export {export_id}") - - # Generate CSV data - rows = list(generate_rows(queryset, is_admin, user_municipality_ids)) + logger.warning(f"User {user_id} not found") + + # Process in batches to reduce memory usage + for i in range(0, initial_count, batch_size): + batch = queryset[i:i + batch_size] + batch_rows = [] + + for observation in batch: + try: + row = _prepare_row_data(observation, is_admin, user_municipality_ids) + batch_rows.append(row) + processed += 1 + + if processed % 100 == 0: + progress = int((processed / initial_count) * 100) + export.progress = progress + export.save() + logger.info(f"Processed {processed}/{initial_count} records") + except Exception as e: + logger.error(f"Error processing observation {observation.id}: {e}") + continue + + # Add batch to rows and clear batch data + rows.extend(batch_rows) + batch_rows = [] # Store in cache cache_key = f'export_{export_id}_data' - cache.set(cache_key, rows, timeout=3600) # Store for 1 hour + cache.set(cache_key, rows, timeout=3600) # Update export record with transaction.atomic(): @@ -199,11 +223,10 @@ def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[i export.save() logger.info(f"Export {export_id} completed successfully") - return { 'status': 'completed', 'cache_key': cache_key, - 'total_processed': total + 'total_processed': processed } except Exception as e: @@ -212,7 +235,7 @@ def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[i export.error_message = str(e) export.save() raise - + @shared_task def cleanup_old_exports() -> None: """Clean up exports older than 24 hours.""" diff --git a/vespadb/observations/views.py b/vespadb/observations/views.py index 28b6b9b..ede1019 100644 --- a/vespadb/observations/views.py +++ b/vespadb/observations/views.py @@ -652,8 +652,18 @@ def export(self, request: HttpRequest) -> JsonResponse: if not filterset.is_valid(): return JsonResponse({"error": filterset.errors}, status=400) - # Prepare the filter parameters - filters = {key: value for key, value in request.GET.items()} + # Get the filtered queryset count first + filtered_count = filterset.qs.count() + if filtered_count > 10000: + return JsonResponse({ + "error": f"Export too large. Found {filtered_count} records, maximum allowed is 10,000" + }, status=400) + + # Prepare the filter parameters - only include valid filters + filters = {} + for key, value in request.GET.items(): + if key in filterset.filters and value: + filters[key] = value # Create an Export record export = Export.objects.create( @@ -676,8 +686,9 @@ def export(self, request: HttpRequest) -> JsonResponse: return JsonResponse({ 'export_id': export.id, 'task_id': task.id, + 'total_records': filtered_count }) - + @swagger_auto_schema( operation_description="Check the status of an export.", manual_parameters=[