diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbServiceIntroduction.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbServiceIntroduction.java index 2f9c656a5..c0a1ac8d9 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbServiceIntroduction.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbServiceIntroduction.java @@ -78,7 +78,7 @@ public Object doIntercept(MethodInvocationContext context, C try { return doIntercept(context, service); } catch (ResourceNotFoundException ignored) { - return unwrapIfRequired(Flux.from(service.createTable()).map(t -> doIntercept(context, service)), context.getReturnType().getType()); + return unwrapIfRequired(Flux.from(service.createTable()).map(t -> doIntercept(context, service)), context); } } @@ -112,64 +112,64 @@ private Publisher toPublisher(Class type, Argument itemArgument, Ma private Object doIntercept(MethodInvocationContext context, AsyncDynamoDbService service) { String methodName = context.getMethodName(); if (methodName.startsWith("save")) { - return unwrapIfRequired(handleSave(service, context), context.getReturnType().getType()); + return unwrapIfRequired(handleSave(service, context), context); } if (methodName.startsWith("get") || methodName.startsWith("load")) { - return unwrapIfRequired(handleGet(service, context), context.getReturnType().getType()); + return unwrapIfRequired(handleGet(service, context), context); } if (context.getTargetMethod().isAnnotationPresent(Query.class)) { DetachedQuery criteria = functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Query.class).value(), context); if (methodName.startsWith("count")) { - return unwrapIfRequired(service.count(criteria), context.getReturnType().getType()); + return unwrapIfRequired(service.count(criteria), context); } Publisher queryResult = service.query(criteria); if (methodName.startsWith("delete")) { - return unwrapIfRequired(service.deleteAll(queryResult), context.getReturnType().getType()); + return unwrapIfRequired(service.deleteAll(queryResult), context); } if (context.getTargetMethod().isAnnotationPresent(Update.class)) { UpdateBuilder update = (UpdateBuilder) functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Update.class).value(), context); - return unwrapIfRequired(service.updateAll(queryResult, update), context.getReturnType().getType()); + return unwrapIfRequired(service.updateAll(queryResult, update), context); } - return unwrapIfRequired(queryResult, context.getReturnType().getType()); + return unwrapIfRequired(queryResult, context); } if (context.getTargetMethod().isAnnotationPresent(Scan.class)) { DetachedScan criteria = functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Scan.class).value(), context); if (methodName.startsWith("count")) { - return unwrapIfRequired(service.count(criteria), context.getReturnType().getType()); + return unwrapIfRequired(service.count(criteria), context); } Publisher scanResult = service.scan(criteria); if (methodName.startsWith("delete")) { - return unwrapIfRequired(service.deleteAll(scanResult), context.getReturnType().getType()); + return unwrapIfRequired(service.deleteAll(scanResult), context); } if (context.getTargetMethod().isAnnotationPresent(Update.class)) { UpdateBuilder update = (UpdateBuilder) functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Update.class).value(), context); - return unwrapIfRequired(service.updateAll(scanResult, update), context.getReturnType().getType()); + return unwrapIfRequired(service.updateAll(scanResult, update), context); } - return unwrapIfRequired(scanResult, context.getReturnType().getType()); + return unwrapIfRequired(scanResult, context); } if (context.getTargetMethod().isAnnotationPresent(Update.class)) { DetachedUpdate criteria = functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Update.class).value(), context); - return unwrapIfRequired(service.update(criteria), context.getReturnType().getType()); + return unwrapIfRequired(service.update(criteria), context); } if (methodName.startsWith("delete")) { Optional maybeItemArgument = ItemArgument.findItemArgument(service.getItemType(), context); if (maybeItemArgument.isPresent()) { - return unwrapIfRequired(handleDelete(service, context, maybeItemArgument), context.getReturnType().getType()); + return unwrapIfRequired(handleDelete(service, context, maybeItemArgument), context); } } @@ -177,30 +177,32 @@ private Object doIntercept(MethodInvocationContext context, QueryArguments partitionAndSort = QueryArguments.create(context, service.getTable().tableSchema().tableMetadata(), service.getItemType()); if (methodName.startsWith("count")) { if (partitionAndSort.isCustomized()) { - return unwrapIfRequired(service.countUsingQuery(partitionAndSort.generateQuery(context, conversionService)), context.getReturnType().getType()); + return unwrapIfRequired(service.countUsingQuery(partitionAndSort.generateQuery(context, conversionService)), context); } - return unwrapIfRequired(service.count(partitionAndSort.getPartitionValue(context.getParameters()), partitionAndSort.getSortValue(context.getParameters())), context.getReturnType().getType()); + return unwrapIfRequired(service.count(partitionAndSort.getPartitionValue(context.getParameters()), partitionAndSort.getSortValue(context.getParameters())), context); } if (methodName.startsWith("delete")) { if (partitionAndSort.isCustomized()) { - return unwrapIfRequired(service.deleteAll(service.query(partitionAndSort.generateQuery(context, conversionService))), context.getReturnType().getType()); + return unwrapIfRequired(service.deleteAll(service.query(partitionAndSort.generateQuery(context, conversionService))), context); } Optional maybeItemArgument = ItemArgument.findItemArgument(service.getItemType(), context); - return unwrapIfRequired(handleDelete(service, context, maybeItemArgument), context.getReturnType().getType()); + return unwrapIfRequired(handleDelete(service, context, maybeItemArgument), context); } if (partitionAndSort.isCustomized()) { - return unwrapIfRequired(service.query(partitionAndSort.generateQuery(context, conversionService)), context.getReturnType().getType()); + return unwrapIfRequired(service.query(partitionAndSort.generateQuery(context, conversionService)), context); } return unwrapIfRequired( service.findAll(partitionAndSort.getPartitionValue(context.getParameters()), partitionAndSort.getSortValue(context.getParameters())), - context.getReturnType().getType() + context ); } throw new UnsupportedOperationException("Cannot implement method " + context.getExecutableMethod().getTargetMethod()); } - private Object unwrapIfRequired(Publisher publisher, Class type) { + private Object unwrapIfRequired(Publisher publisherWithoutCheckpoint, MethodInvocationContext context) { + Class type = context.getReturnType().getType(); + Publisher publisher = publisherWithCheckpoint(publisherWithoutCheckpoint, context); if (void.class.isAssignableFrom(type) || Void.class.isAssignableFrom(type)) { return Mono.from(publisher).block(); } @@ -322,4 +324,24 @@ private Publisher handleGet(AsyncDynamoDbService service, MethodInvoca return service.get(partitionValue, partitionAndSort.getSortValue(params)); } + private Publisher publisherWithCheckpoint(Publisher publisher, MethodInvocationContext context) { + if (publisher instanceof Mono mono) { + return monoWithCheckpoint(mono, context); + } + + if (publisher instanceof Flux flux) { + return fluxWithCheckpoint(flux, context); + } + + return Flux.from(publisher).checkpoint(context.getExecutableMethod().toString(), true); + } + + private Mono monoWithCheckpoint(Publisher publisher, MethodInvocationContext context) { + return Mono.from(publisher).checkpoint(context.getExecutableMethod().toString(), true); + } + + private Flux fluxWithCheckpoint(Publisher publisher, MethodInvocationContext context) { + return Flux.from(publisher).checkpoint(context.getExecutableMethod().toString(), true); + } + }