Skip to content

Commit

Permalink
Chore/better error handling for async dynamodb (#274)
Browse files Browse the repository at this point in the history
* Allowing Stream as return type from DynamoDB declarative service

* fixed wrong import

* added checkpoints to async dynamodb service introduction
  • Loading branch information
musketyr authored Jan 9, 2025
1 parent 145f060 commit ce57a4a
Showing 1 changed file with 42 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public <T> Object doIntercept(MethodInvocationContext<Object, Object> context, C
try {
return doIntercept(context, service);
} catch (ResourceNotFoundException ignored) {
return unwrapIfRequired(Flux.from(service.createTable()).map(t -> doIntercept(context, service)), context.getReturnType().getType());
return unwrapIfRequired(Flux.from(service.createTable()).map(t -> doIntercept(context, service)), context);
}
}

Expand Down Expand Up @@ -112,95 +112,97 @@ private <T> Publisher<T> toPublisher(Class<T> type, Argument<?> itemArgument, Ma
private <T> Object doIntercept(MethodInvocationContext<Object, Object> context, AsyncDynamoDbService<T> service) {
String methodName = context.getMethodName();
if (methodName.startsWith("save")) {
return unwrapIfRequired(handleSave(service, context), context.getReturnType().getType());
return unwrapIfRequired(handleSave(service, context), context);
}

if (methodName.startsWith("get") || methodName.startsWith("load")) {
return unwrapIfRequired(handleGet(service, context), context.getReturnType().getType());
return unwrapIfRequired(handleGet(service, context), context);
}

if (context.getTargetMethod().isAnnotationPresent(Query.class)) {
DetachedQuery<T> criteria = functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Query.class).value(), context);

if (methodName.startsWith("count")) {
return unwrapIfRequired(service.count(criteria), context.getReturnType().getType());
return unwrapIfRequired(service.count(criteria), context);
}

Publisher<T> queryResult = service.query(criteria);
if (methodName.startsWith("delete")) {
return unwrapIfRequired(service.deleteAll(queryResult), context.getReturnType().getType());
return unwrapIfRequired(service.deleteAll(queryResult), context);
}

if (context.getTargetMethod().isAnnotationPresent(Update.class)) {
UpdateBuilder<T, ?> update = (UpdateBuilder<T, ?>) functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Update.class).value(), context);
return unwrapIfRequired(service.updateAll(queryResult, update), context.getReturnType().getType());
return unwrapIfRequired(service.updateAll(queryResult, update), context);
}

return unwrapIfRequired(queryResult, context.getReturnType().getType());
return unwrapIfRequired(queryResult, context);
}

if (context.getTargetMethod().isAnnotationPresent(Scan.class)) {
DetachedScan<T> criteria = functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Scan.class).value(), context);

if (methodName.startsWith("count")) {
return unwrapIfRequired(service.count(criteria), context.getReturnType().getType());
return unwrapIfRequired(service.count(criteria), context);
}

Publisher<T> scanResult = service.scan(criteria);

if (methodName.startsWith("delete")) {
return unwrapIfRequired(service.deleteAll(scanResult), context.getReturnType().getType());
return unwrapIfRequired(service.deleteAll(scanResult), context);
}

if (context.getTargetMethod().isAnnotationPresent(Update.class)) {
UpdateBuilder<T, ?> update = (UpdateBuilder<T, ?>) functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Update.class).value(), context);
return unwrapIfRequired(service.updateAll(scanResult, update), context.getReturnType().getType());
return unwrapIfRequired(service.updateAll(scanResult, update), context);
}

return unwrapIfRequired(scanResult, context.getReturnType().getType());
return unwrapIfRequired(scanResult, context);
}

if (context.getTargetMethod().isAnnotationPresent(Update.class)) {
DetachedUpdate<T, ?> criteria = functionEvaluator.evaluateAnnotationType(context.getTargetMethod().getAnnotation(Update.class).value(), context);

return unwrapIfRequired(service.update(criteria), context.getReturnType().getType());
return unwrapIfRequired(service.update(criteria), context);
}

if (methodName.startsWith("delete")) {
Optional<ItemArgument> maybeItemArgument = ItemArgument.findItemArgument(service.getItemType(), context);
if (maybeItemArgument.isPresent()) {
return unwrapIfRequired(handleDelete(service, context, maybeItemArgument), context.getReturnType().getType());
return unwrapIfRequired(handleDelete(service, context, maybeItemArgument), context);
}
}

if (methodName.startsWith("query") || methodName.startsWith("findAll") || methodName.startsWith("list") || methodName.startsWith("count") || methodName.startsWith("delete")) {
QueryArguments partitionAndSort = QueryArguments.create(context, service.getTable().tableSchema().tableMetadata(), service.getItemType());
if (methodName.startsWith("count")) {
if (partitionAndSort.isCustomized()) {
return unwrapIfRequired(service.countUsingQuery(partitionAndSort.generateQuery(context, conversionService)), context.getReturnType().getType());
return unwrapIfRequired(service.countUsingQuery(partitionAndSort.generateQuery(context, conversionService)), context);
}
return unwrapIfRequired(service.count(partitionAndSort.getPartitionValue(context.getParameters()), partitionAndSort.getSortValue(context.getParameters())), context.getReturnType().getType());
return unwrapIfRequired(service.count(partitionAndSort.getPartitionValue(context.getParameters()), partitionAndSort.getSortValue(context.getParameters())), context);
}
if (methodName.startsWith("delete")) {
if (partitionAndSort.isCustomized()) {
return unwrapIfRequired(service.deleteAll(service.query(partitionAndSort.generateQuery(context, conversionService))), context.getReturnType().getType());
return unwrapIfRequired(service.deleteAll(service.query(partitionAndSort.generateQuery(context, conversionService))), context);
}
Optional<ItemArgument> maybeItemArgument = ItemArgument.findItemArgument(service.getItemType(), context);
return unwrapIfRequired(handleDelete(service, context, maybeItemArgument), context.getReturnType().getType());
return unwrapIfRequired(handleDelete(service, context, maybeItemArgument), context);
}
if (partitionAndSort.isCustomized()) {
return unwrapIfRequired(service.query(partitionAndSort.generateQuery(context, conversionService)), context.getReturnType().getType());
return unwrapIfRequired(service.query(partitionAndSort.generateQuery(context, conversionService)), context);
}
return unwrapIfRequired(
service.findAll(partitionAndSort.getPartitionValue(context.getParameters()), partitionAndSort.getSortValue(context.getParameters())),
context.getReturnType().getType()
context
);
}

throw new UnsupportedOperationException("Cannot implement method " + context.getExecutableMethod().getTargetMethod());
}

private Object unwrapIfRequired(Publisher<?> publisher, Class<Object> type) {
private Object unwrapIfRequired(Publisher<?> publisherWithoutCheckpoint, MethodInvocationContext<Object, Object> context) {
Class<Object> type = context.getReturnType().getType();
Publisher<?> publisher = publisherWithCheckpoint(publisherWithoutCheckpoint, context);
if (void.class.isAssignableFrom(type) || Void.class.isAssignableFrom(type)) {
return Mono.from(publisher).block();
}
Expand Down Expand Up @@ -322,4 +324,24 @@ private <T> Publisher<T> handleGet(AsyncDynamoDbService<T> service, MethodInvoca
return service.get(partitionValue, partitionAndSort.getSortValue(params));
}

private <T> Publisher<T> publisherWithCheckpoint(Publisher<T> publisher, MethodInvocationContext<Object, Object> context) {
if (publisher instanceof Mono<T> mono) {
return monoWithCheckpoint(mono, context);
}

if (publisher instanceof Flux<T> flux) {
return fluxWithCheckpoint(flux, context);
}

return Flux.from(publisher).checkpoint(context.getExecutableMethod().toString(), true);
}

private <T> Mono<T> monoWithCheckpoint(Publisher<T> publisher, MethodInvocationContext<Object, Object> context) {
return Mono.from(publisher).checkpoint(context.getExecutableMethod().toString(), true);
}

private <T> Flux<T> fluxWithCheckpoint(Publisher<T> publisher, MethodInvocationContext<Object, Object> context) {
return Flux.from(publisher).checkpoint(context.getExecutableMethod().toString(), true);
}

}

0 comments on commit ce57a4a

Please sign in to comment.