Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

export and filter #283

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/components/FilterComponent.vue
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export default {
{ name: 'Zichtbaar', value: true },
{ name: 'Niet zichtbaar', value: false }
]);
const minDate = ref(new Date(new Date().getFullYear(), 3, 1));
const minDate = ref(new Date(2024, 3, 1));
const maxDate = ref(null);
const selectedObservationStart = ref(false);
const selectedObservationEnd = ref(false);
Expand All @@ -121,7 +121,7 @@ export default {
max_observation_date: maxDateCET,
visible: visibleActief.value
});
}, 300);
const toggleMenu1 = () => {
Expand Down Expand Up @@ -156,11 +156,11 @@ export default {
watch([selectedMunicipalities, selectedProvinces, selectedNestType, selectedNestStatus, anbAreasActief, selectedObservationStart, selectedObservationEnd, visibleActief], () => {
emitFilterUpdate();
}, { deep: true});
}, { deep: true });
watch(() => vespaStore.filters, (newFilters, oldFilters) => {
const hasChanged = JSON.stringify(newFilters) !== JSON.stringify(oldFilters);
if (hasChanged) {
selectedMunicipalities.value = newFilters.municipalities || [];
selectedProvinces.value = newFilters.provinces || [];
Expand Down
93 changes: 58 additions & 35 deletions vespadb/observations/tasks/generate_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,49 +128,54 @@ def generate_rows(queryset, is_admin: bool, user_municipality_ids: set) -> Itera
acks_late=True
)
def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[int] = None) -> Dict[str, Any]:
"""
Generate CSV export of observations based on filters.
Args:
export_id: ID of the Export record
filters: Dictionary of filters to apply to the queryset
user_id: Optional ID of the user requesting the export
Returns:
Dictionary containing export status and details
"""
logger.info(f"Starting export {export_id} for user {user_id}")
"""Generate CSV export of observations based on filters."""
logger.info(f"Starting export {export_id} for user {user_id} with filters: {filters}")
export = Export.objects.get(id=export_id)

try:
# Update export status
export.status = 'processing'
export.save()
logger.info(f"Export {export_id} status set to processing")

# Validate and preprocess filters
# Clean and validate filters before applying
valid_fields = {field.name: field for field in Observation._meta.get_fields()}
processed_filters = {}

# Log the incoming filters
logger.info(f"Processing filters: {filters}")

for key, value in filters.items():
# Skip pagination and ordering parameters
if key in ['page', 'page_size', 'ordering']:
continue

if key in valid_fields:
field = valid_fields[key]
if isinstance(field, models.BooleanField):
try:
try:
if isinstance(field, models.BooleanField):
processed_filters[key] = parse_boolean(value)
except ValueError:
logger.error(f"Invalid boolean value for filter {key}: {value}")
continue
else:
processed_filters[key] = value
elif value: # Only add non-empty values
processed_filters[key] = value
except ValueError as e:
logger.warning(f"Skipping invalid filter {key}: {value}, error: {e}")
continue

logger.info(f"Processed filters: {processed_filters}")

# Prepare queryset with optimizations
queryset = (Observation.objects
.filter(**processed_filters)
# Apply filters and get initial count
queryset = Observation.objects.filter(**processed_filters)
initial_count = queryset.count()
logger.info(f"Initial queryset count: {initial_count}")

# Add optimizations
queryset = (queryset
.select_related('province', 'municipality', 'reserved_by')
.order_by('id'))

total = queryset.count()
# Process in batches
batch_size = 1000
processed = 0
rows = [CSV_HEADERS] # Start with headers

is_admin = False
user_municipality_ids = set()
Expand All @@ -180,16 +185,35 @@ def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[i
is_admin = user.is_superuser
user_municipality_ids = set(user.municipalities.values_list('id', flat=True))
except User.DoesNotExist:
pass

logger.info(f"Processing {total} observations for export {export_id}")

# Generate CSV data
rows = list(generate_rows(queryset, is_admin, user_municipality_ids))
logger.warning(f"User {user_id} not found")

# Process in batches to reduce memory usage
for i in range(0, initial_count, batch_size):
batch = queryset[i:i + batch_size]
batch_rows = []

for observation in batch:
try:
row = _prepare_row_data(observation, is_admin, user_municipality_ids)
batch_rows.append(row)
processed += 1

if processed % 100 == 0:
progress = int((processed / initial_count) * 100)
export.progress = progress
export.save()
logger.info(f"Processed {processed}/{initial_count} records")
except Exception as e:
logger.error(f"Error processing observation {observation.id}: {e}")
continue

# Add batch to rows and clear batch data
rows.extend(batch_rows)
batch_rows = []

# Store in cache
cache_key = f'export_{export_id}_data'
cache.set(cache_key, rows, timeout=3600) # Store for 1 hour
cache.set(cache_key, rows, timeout=3600)

# Update export record
with transaction.atomic():
Expand All @@ -199,11 +223,10 @@ def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[i
export.save()

logger.info(f"Export {export_id} completed successfully")

return {
'status': 'completed',
'cache_key': cache_key,
'total_processed': total
'total_processed': processed
}

except Exception as e:
Expand All @@ -212,7 +235,7 @@ def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[i
export.error_message = str(e)
export.save()
raise

@shared_task
def cleanup_old_exports() -> None:
"""Clean up exports older than 24 hours."""
Expand Down
17 changes: 14 additions & 3 deletions vespadb/observations/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,18 @@ def export(self, request: HttpRequest) -> JsonResponse:
if not filterset.is_valid():
return JsonResponse({"error": filterset.errors}, status=400)

# Prepare the filter parameters
filters = {key: value for key, value in request.GET.items()}
# Get the filtered queryset count first
filtered_count = filterset.qs.count()
if filtered_count > 10000:
return JsonResponse({
"error": f"Export too large. Found {filtered_count} records, maximum allowed is 10,000"
}, status=400)

# Prepare the filter parameters - only include valid filters
filters = {}
for key, value in request.GET.items():
if key in filterset.filters and value:
filters[key] = value

# Create an Export record
export = Export.objects.create(
Expand All @@ -676,8 +686,9 @@ def export(self, request: HttpRequest) -> JsonResponse:
return JsonResponse({
'export_id': export.id,
'task_id': task.id,
'total_records': filtered_count
})

@swagger_auto_schema(
operation_description="Check the status of an export.",
manual_parameters=[
Expand Down
Loading