Skip to content

Commit

Permalink
merg dev into main
Browse files Browse the repository at this point in the history
  • Loading branch information
stevegerrits committed Oct 28, 2024
2 parents 07bebe8 + 614fb3f commit 496b48b
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 46 deletions.
18 changes: 17 additions & 1 deletion vespadb/observations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from functools import wraps
from django.db import connection, OperationalError
from typing import Callable, TypeVar, Any, cast
from typing import Callable, TypeVar, Any, cast, Generator, List

F = TypeVar("F", bound=Callable[..., Any])

Expand Down Expand Up @@ -55,3 +55,19 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
raise
return cast(F, wrapper)
return decorator

def retry_query(queryset: Generator[Any, None, None], retries: int = 3, delay: int = 5) -> List[Any]:
"""Execute a query with retries to handle intermittent database connection errors."""
for attempt in range(retries):
try:
return list(queryset)
except OperationalError:
if attempt < retries - 1:
time.sleep(delay)
connection.close()
else:
# Raise a more informative error if retries are exhausted
raise OperationalError(f"Database connection failed after {retries} attempts")

# This return is added to satisfy type checkers, though it should never reach here.
return []
125 changes: 80 additions & 45 deletions vespadb/observations/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import json
import time
import logging
from typing import TYPE_CHECKING, Any
import csv
import json
from typing import TYPE_CHECKING, Any, Generator, Any, Union

from django.contrib.gis.db.models.functions import Transform
from django.contrib.gis.geos import GEOSGeometry
Expand All @@ -17,8 +19,8 @@
from django.db import transaction
from django.db.models import CharField, OuterRef, QuerySet, Subquery, Value
from django.db.models.functions import Coalesce
from django.db.utils import IntegrityError, OperationalError
from django.http import HttpResponse, JsonResponse
from django.db.utils import IntegrityError
from django.http import HttpResponse, JsonResponse, StreamingHttpResponse, HttpRequest
from django.db import connection
from django.utils.decorators import method_decorator
from django.utils.timezone import now
Expand All @@ -43,7 +45,6 @@
from vespadb.observations.cache import invalidate_geojson_cache, invalidate_observation_cache
from vespadb.observations.filters import ObservationFilter
from vespadb.observations.helpers import parse_and_convert_to_utc
from vespadb.observations.utils import db_retry
from vespadb.observations.models import Municipality, Observation, Province
from vespadb.observations.serializers import (
MunicipalitySerializer,
Expand All @@ -60,8 +61,15 @@
BBOX_LENGTH = 4
GEOJSON_REDIS_CACHE_EXPIRATION = 900 # 15 minutes
GET_REDIS_CACHE_EXPIRATION = 86400 # 1 day


BATCH_SIZE = 150
CSV_HEADERS = [
"id", "created_datetime", "modified_datetime", "location", "source",
"nest_height", "nest_size", "nest_location", "nest_type",
"observation_datetime", "modified_by", "created_by", "province",
"eradication_date", "municipality", "images", "public_domain",
"municipality_name", "modified_by_first_name", "created_by_first_name",
"wn_notes", "eradication_result", "wn_id", "wn_validation_status"
]
class ObservationsViewSet(ModelViewSet): # noqa: PLR0904
"""ViewSet for the Observation model."""

Expand Down Expand Up @@ -635,49 +643,76 @@ def save_observations(self, valid_data: list[dict[str, Any]]) -> Response:
)
@method_decorator(ratelimit(key="ip", rate="60/m", method="GET", block=True))
@action(detail=False, methods=["get"], permission_classes=[AllowAny])
def export(self, request: Request) -> Response:
retries = 3
delay = 5

for attempt in range(retries):
try:
export_format = request.query_params.get("export_format", "csv").lower()
queryset = self.filter_queryset(self.get_queryset())
def export(self, request: HttpRequest) -> Union[StreamingHttpResponse, JsonResponse]:
"""
Export observations data as CSV in a memory-efficient, streamable format.
Handles large datasets by streaming data in chunks to avoid memory overload.
Only supports CSV export; JSON format is no longer available.
"""
if request.query_params.get("export_format", "csv").lower() != "csv":
return JsonResponse({"error": "Only CSV export is supported"}, status=400)

serialized_data = []
errors = []
# Filter queryset
queryset = self.filter_queryset(self.get_queryset())

for obj in queryset.iterator(chunk_size=100):
try:
serialized_obj = self.get_serializer(obj).data
serialized_data.append(serialized_obj)
except Exception as inner_error:
errors.append({
"id": obj.id,
"error": str(inner_error)
})

if export_format == "csv":
return self.export_as_csv(serialized_data)
return JsonResponse({"data": serialized_data, "errors": errors}, safe=False)

except OperationalError:
if attempt < retries - 1:
time.sleep(delay)
connection.close()
else:
return JsonResponse({"error": "Database connection failed after retries"}, safe=False)
except Exception as e:
return JsonResponse({"errors": str(e)}, safe=False)

def export_as_csv(self, data: list[dict[str, Any]]) -> HttpResponse:
"""Export the data as a CSV file."""
response = HttpResponse(content_type="text/csv")
writer = csv.DictWriter(response, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
# Define response with streaming CSV data
response = StreamingHttpResponse(
self.generate_csv_rows(queryset), content_type="text/csv"
)
response["Content-Disposition"] = 'attachment; filename="observations_export.csv"'
return response

def generate_csv_rows(self, queryset: QuerySet) -> Generator[bytes, None, None]:
"""
Generator that yields rows of CSV data, handling large datasets efficiently.
Converts each observation to a dictionary row, handling missing or misconfigured
data gracefully, and writes to CSV format on-the-fly to avoid memory overuse.
"""
# Yield CSV header row
yield self._csv_line(CSV_HEADERS)

# Iterate over queryset in chunks to avoid high memory usage
for obj in queryset.iterator(chunk_size=500):
row = self.serialize_observation(obj)
yield self._csv_line(row)


def serialize_observation(self, obj: Observation) -> list[str]:
"""
Serialize observation to a list of values in the same order as headers.
Handles potential data misconfigurations, such as missing attributes or
inconsistent formats, to ensure robust data handling.
"""
try:
return [
str(getattr(obj, field, "")) or "" for field in [
"id", "created_datetime", "modified_datetime", "location", "source",
"nest_height", "nest_size", "nest_location", "nest_type",
"observation_datetime", "modified_by_id", "created_by_id", "province_id",
"eradication_date", "municipality_id", "images", "public_domain",
"municipality_name", "modified_by_first_name", "created_by_first_name",
"wn_notes", "eradication_result", "wn_id", "wn_validation_status"
]
]
except Exception as e:
# Log and handle any serialization issues
logger.exception(f"Error serializing observation {obj.id}: {e}")
return [""] * len(CSV_HEADERS)

def _csv_line(self, row: list[str]) -> bytes:
"""
Converts a list of strings into a CSV-encoded line.
Ensures each row is CSV-compatible and byte-encoded for StreamingHttpResponse.
"""
buffer = io.StringIO()
writer = csv.writer(buffer)
writer.writerow(row)
return buffer.getvalue().encode("utf-8")

@require_GET
def search_address(request: Request) -> JsonResponse:
"""
Expand Down

0 comments on commit 496b48b

Please sign in to comment.