Skip to content

Commit

Permalink
DynamoDB loader
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Jan 16, 2025
1 parent d3d0dae commit 205d23c
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 0 deletions.
25 changes: 25 additions & 0 deletions docs/guide/src/docs/asciidoc/dynamodb.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1096,3 +1096,28 @@ aws:
----
====

==== Data Loader

You can data exported as CSV from the AWS DynamoDB Console:

image::export-tables.png[]

These data can be loaded using `DynamoDbLoader` bean. The following test shows how to load the data from the CSV file into your DynamoDB tables. Use the integration testing library as described above to have the automatic local DynamoDB for testing.

[source,java,indent]
.DynamoDbLoader Example
----
include::{root-dir}/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/java/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoaderTest.java[lines=19..-1]
----
<1> Test must be annotated wiht `@MicronautTest` to allow loader injection
<2> Property `aws.dynamodb.create-tables` guarantees that the tables are created automatically
<3> https://agorapulse.github.io/testing-libraries/#_fixt[Fixt] is very convenient way how to keep your test fixtures organized and can be easily used with the loader
<4> Load the data from the CSV file
<5> The mapping is specified as map with the entity types as keys and iterables of the CSV files as values
<6> After loading the data, the entities according to the rows in the CSV file are available in the database

This is how the files are laid out for this particular example:

image:loader-layout.png[]

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mockitoVersion=2.23.4
kotlinVersion=1.9.21
kspVersion=1.9.21-1.0.15
grgitVersion=5.0.0
opencsvVersion=5.10

# other versions creates conflicts in Groovydoc
groovyVersion = 4.0.16
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2018-2025 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
dependencies {
api project(':micronaut-amazon-awssdk-dynamodb')
implementation "com.opencsv:opencsv:$opencsvVersion"
implementation 'io.micronaut.reactor:micronaut-reactor'
implementation 'io.micronaut:micronaut-jackson-databind'

testImplementation project(':micronaut-amazon-awssdk-integration-testing')
testImplementation 'com.agorapulse.testing:fixt:0.2.3'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2018-2025 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.amazon.awssdk.dynamodb.loader;

import com.agorapulse.micronaut.amazon.awssdk.dynamodb.DynamoDBServiceProvider;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.CSVReaderHeaderAware;
import com.opencsv.exceptions.CsvValidationException;
import io.micronaut.core.beans.BeanIntrospection;
import io.micronaut.core.beans.BeanProperty;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.util.StringUtils;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.UnaryOperator;

@Singleton
@Named("csv")
public class CsvDynamoDbLoader implements DynamoDbLoader {

private static final Logger LOGGER = LoggerFactory.getLogger(CsvDynamoDbLoader.class);

private final Scheduler scheduler = Schedulers.boundedElastic();
private final ConversionService conversionService;
private final DynamoDBServiceProvider dynamoDBServiceProvider;
private final ObjectMapper mapper;

public CsvDynamoDbLoader(ConversionService conversionService, DynamoDBServiceProvider dynamoDBServiceProvider, ObjectMapper mapper) {
this.conversionService = conversionService;
this.dynamoDBServiceProvider = dynamoDBServiceProvider;
this.mapper = mapper;
}

@Override
@SuppressWarnings("unchecked")
public Publisher<Object> load(UnaryOperator<String> fileLoader, Map<Class<?>, Iterable<String>> mappings) {
return (Publisher<Object>) Flux.fromIterable(mappings.entrySet()).parallel().runOn(scheduler).flatMap(entry ->
Flux.fromIterable(entry.getValue()).parallel().runOn(scheduler).flatMap(filename ->
preload(fileLoader, entry.getKey(), filename)
)
).sequential();
}

private <T> Flux<T> load(UnaryOperator<String> fileLoader, Class<T> type, String filename) {
BeanIntrospection<T> introspection = BeanIntrospection.getIntrospection(type);

long tick = System.currentTimeMillis();

String text = fileLoader.apply(filename);

if (text == null) {
LOGGER.error("File {} not found", filename);
return Flux.empty();
}

try {
CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new StringReader(text));

LOGGER.info("Loading records from {} as {}", filename, type);

Map<String, BeanProperty<T, Object>> properties = new TreeMap<>();
Set<String> notFound = new HashSet<>();

return Flux.<Map<String, String>>generate(sink -> {
try {
Map<String, String> data = reader.readMap();
if (data == null) {
sink.complete();
} else {
sink.next(data);
}
} catch (IOException | CsvValidationException e) {
sink.error(e);
}
}).map(data -> {
T insight = introspection.instantiate();

data.forEach((header, value) -> {
if (StringUtils.isEmpty(value)) {
return;
}

if (notFound.contains(header)) {
return;
}

BeanProperty<T, Object> property = properties.computeIfAbsent(header, h -> {
BeanProperty<T, Object> prop = introspection.getProperty(h).orElse(null);
if (prop == null) {
notFound.add(h);
LOGGER.warn("Property {} not found in {} for file {}", h, type, filename);
}
return prop;
});

if (property == null) {
return;
}

Optional<?> convertedValue;
if (value.startsWith("\"{\"\"")) {
String cleanUpJson = value.substring(1, value.length() - 1).replace("\"\"", "\"");
try {
convertedValue = Optional.of(mapper.readValue(cleanUpJson, property.getType()));
} catch (JsonProcessingException e) {
convertedValue = Optional.empty();
LOGGER.error("Error parsing JSON: {} for property: {} of {}", cleanUpJson, property.getName(), type);
}
} else {
convertedValue = conversionService.convert(value, property.asArgument());
}

convertedValue.map(Object.class::cast).ifPresent(o -> property.set(insight, o));
});

return insight;
}).doOnComplete(() -> {
LOGGER.info("Loaded {} records from {} as {} in {} ms", reader.getRecordsRead(), filename, type, System.currentTimeMillis() - tick);
try {
reader.close();
} catch (IOException e) {
LOGGER.error("Error closing CSV: {}", e.getMessage());
}
});
} catch (IOException e) {
LOGGER.error("Error parsing CSV: {}", e.getMessage());
return Flux.empty();
}
}

private <T> ParallelFlux<T> preload(UnaryOperator<String> fileLoader, Class<T> insightType, String fileName) {
return Flux.from(dynamoDBServiceProvider.findOrCreate(insightType).saveAll(load(fileLoader, insightType, fileName))).parallel().runOn(Schedulers.boundedElastic());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2018-2025 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.amazon.awssdk.dynamodb.loader;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.Map;
import java.util.function.UnaryOperator;

/**
* Loads data into DynamoDB.
*/
public interface DynamoDbLoader {

/**
* Loads data into DynamoDB and publishes the loaded items as they are loaded.
*
* @param fileLoader the file loader function that takes the file name and returns the content of the file
* @param mappings the mappings the map containing class as keys and file names as values
* @return the publisher of loaded items
*/
Publisher<Object> load(UnaryOperator<String> fileLoader, Map<Class<?>, Iterable<String>> mappings);

/**
* Loads data into DynamoDB and waits for the completion.
*
* @param fileLoader the file loader function that takes the file name and returns the content of the file
* @param mappings the mappings the map containing class as keys and file names as values
*/
default void loadAll(UnaryOperator<String> fileLoader, Map<Class<?>, Iterable<String>> mappings) {
Flux.from(load(fileLoader, mappings)).blockLast();
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2018-2025 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.amazon.awssdk.dynamodb.loader;

import com.agorapulse.micronaut.amazon.awssdk.dynamodb.DynamoDBServiceProvider;
import com.agorapulse.testing.fixt.Fixt;
import io.micronaut.context.annotation.Property;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

import java.time.Instant;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

@MicronautTest // <1>
@Property(name = "aws.dynamodb.create-tables", value = "true") // <2>
class DynamoDbLoaderTest {

private static final Fixt fixt = Fixt.create(DynamoDbLoaderTest.class); // <3>

@Inject DynamoDbLoader loader;
@Inject DynamoDBServiceProvider provider;

@Test
void loadIntoDynamoDb() {
TestEntity referenceEntity = getReferenceEntity();

Object loadedValue = Flux.from(
loader.load( // <4>
fixt::readText,
Map.of(
TestEntity.class, List.of("test-entity.csv") // <5>
)
)
).blockLast();

assertInstanceOf(TestEntity.class, loadedValue);

assertEquals(referenceEntity, loadedValue);

TestEntity fromDb = provider.findOrCreate(TestEntity.class).get("1", null); // <6>
assertEquals(referenceEntity, fromDb);
}

private static TestEntity getReferenceEntity() {
TestEntity referenceEntity = new TestEntity();
referenceEntity.setId("1");
referenceEntity.setName("test-one");
referenceEntity.setActive(true);
referenceEntity.setCreated(Instant.parse("2019-01-01T00:00:00Z"));
referenceEntity.setCount(2);
referenceEntity.setValue(3.4);
referenceEntity.setData(Map.of("string", "text"));
return referenceEntity;
}

}
Loading

0 comments on commit 205d23c

Please sign in to comment.