Skip to content

Commit

Permalink
Merge pull request #113 from dnstapir/detect_duplicates
Browse files Browse the repository at this point in the history
Handle duplicate aggregates
  • Loading branch information
jschlyter authored Nov 13, 2024
2 parents 5b3fee4 + 8dc0394 commit 73ea593
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ test-client: test-client-p256 test-client-ed25519
test-client-p256: test-private-p256.pem
openssl rand 1024 > random.bin
poetry run aggrec_client --http-key-id test-p256 --http-key-file $< random.bin
openssl rand 1024 > random.bin
poetry run aggrec_client --http-key-id test-p256 --http-key-file $< random.bin
poetry run aggrec_client --http-key-id test-p256 --http-key-file $< random.bin
poetry run aggrec_client --http-key-id test-p256 --http-key-file $< random.bin

test-client-ed25519: test-private-ed25519.pem
openssl rand 1024 > random.bin
Expand Down
30 changes: 27 additions & 3 deletions aggrec/aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@
description="The number of aggregates stored",
)

aggregates_by_creator_counter = meter.create_counter(
"aggregates.counter_by_creator",
description="The number of aggregates per creator",
)

aggregates_duplicates_counter = meter.create_counter(
"aggregates.duplicates_counter",
description="The number of duplicate aggregates received",
)


METADATA_HTTP_HEADERS = [
"User-Agent",
Expand Down Expand Up @@ -107,6 +117,11 @@ def get_http_headers(request: Request, covered_components_headers: List[str]) ->
return res


def get_aggregate_location(aggregate_id: ObjectId) -> str:
"""Get aggregate location"""
return f"/api/v1/aggregates/{aggregate_id}"


def get_new_aggregate_event_message(metadata: AggregateMetadata, settings: Settings) -> dict:
"""Get new aggregate event message"""
return {
Expand Down Expand Up @@ -246,8 +261,15 @@ async def create_aggregate(

http_headers = get_http_headers(request, res.covered_components.keys())

# if we receive an aggregate already seen, return existing metadata
if metadata := AggregateMetadata.objects(content_digest=content_digest).first():
logger.warning("Received duplicate aggregate from %s", creator)
aggregates_duplicates_counter.add(1, {"aggregate_type": aggregate_type.value, "creator": creator})
metadata_location = get_aggregate_location(metadata.id)
return Response(status_code=status.HTTP_201_CREATED, headers={"Location": metadata_location})

aggregate_id = ObjectId()
location = f"/api/v1/aggregates/{aggregate_id}"
metadata_location = get_aggregate_location(aggregate_id)

span.set_attribute("aggregate.id", str(aggregate_id))
span.set_attribute("aggregate.type", aggregate_type.value)
Expand All @@ -273,6 +295,7 @@ async def create_aggregate(
creator=creator,
http_headers=http_headers,
content_type=content_type,
content_digest=content_digest,
s3_bucket=s3_bucket,
)

Expand Down Expand Up @@ -304,6 +327,7 @@ async def create_aggregate(
logger.info("Metadata saved: %s", metadata.id)

aggregates_counter.add(1, {"aggregate_type": aggregate_type.value})
aggregates_by_creator_counter.add(1, {"aggregate_type": aggregate_type.value, "creator": creator})

async with request.app.get_mqtt_client() as mqtt_client:
with tracer.start_as_current_span("mqtt.publish"):
Expand All @@ -312,7 +336,7 @@ async def create_aggregate(
json.dumps(get_new_aggregate_event_message(metadata, request.app.settings)),
)

return Response(status_code=status.HTTP_201_CREATED, headers={"Location": location})
return Response(status_code=status.HTTP_201_CREATED, headers={"Location": metadata_location})


@router.get(
Expand Down Expand Up @@ -372,7 +396,7 @@ async def get_aggregate_payload(
async with request.app.get_s3_client() as s3_client:
s3_obj = await s3_client.get_object(Bucket=metadata.s3_bucket, Key=metadata.s3_object_key)

metadata_location = f"/api/v1/aggregates/{aggregate_id}"
metadata_location = get_aggregate_location(metadata.id)

return StreamingResponse(
content=s3_obj["Body"],
Expand Down
1 change: 1 addition & 0 deletions aggrec/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class AggregateMetadata(Document):

content_type = StringField()
content_length = IntField()
content_digest = StringField(unique=True, sparse=True)

s3_bucket = StringField()
s3_object_key = StringField()
Expand Down

0 comments on commit 73ea593

Please sign in to comment.