diff --git a/docs/guide/src/docs/asciidoc/dynamodb.adoc b/docs/guide/src/docs/asciidoc/dynamodb.adoc index 15b956ce1..e125679fe 100644 --- a/docs/guide/src/docs/asciidoc/dynamodb.adoc +++ b/docs/guide/src/docs/asciidoc/dynamodb.adoc @@ -1096,3 +1096,28 @@ aws: ---- ==== + +==== Data Loader + +You can data exported as CSV from the AWS DynamoDB Console: + +image::export-tables.png[] + +These data can be loaded using `DynamoDbLoader` bean. The following test shows how to load the data from the CSV file into your DynamoDB tables. Use the integration testing library as described above to have the automatic local DynamoDB for testing. + +[source,java,indent] +.DynamoDbLoader Example +---- +include::{root-dir}/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/java/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoaderTest.java[lines=19..-1] +---- +<1> Test must be annotated wiht `@MicronautTest` to allow loader injection +<2> Property `aws.dynamodb.create-tables` guarantees that the tables are created automatically +<3> https://agorapulse.github.io/testing-libraries/#_fixt[Fixt] is very convenient way how to keep your test fixtures organized and can be easily used with the loader +<4> Load the data from the CSV file +<5> The mapping is specified as map with the entity types as keys and iterables of the CSV files as values +<6> After loading the data, the entities according to the rows in the CSV file are available in the database + +This is how the files are laid out for this particular example: + +image:loader-layout.png[] + diff --git a/docs/guide/src/docs/resources/images/export-tables.png b/docs/guide/src/docs/resources/images/export-tables.png new file mode 100644 index 000000000..9c39de98c Binary files /dev/null and b/docs/guide/src/docs/resources/images/export-tables.png differ diff --git a/docs/guide/src/docs/resources/images/loader-layout.png b/docs/guide/src/docs/resources/images/loader-layout.png new file mode 100644 index 000000000..bc9955f1b Binary files /dev/null and b/docs/guide/src/docs/resources/images/loader-layout.png differ diff --git a/gradle.properties b/gradle.properties index d99662c0b..cd4b58802 100644 --- a/gradle.properties +++ b/gradle.properties @@ -34,6 +34,7 @@ mockitoVersion=2.23.4 kotlinVersion=1.9.21 kspVersion=1.9.21-1.0.15 grgitVersion=5.0.0 +opencsvVersion=5.10 # other versions creates conflicts in Groovydoc groovyVersion = 4.0.16 diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb-loader/micronaut-amazon-awssdk-dynamodb-loader.gradle b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/micronaut-amazon-awssdk-dynamodb-loader.gradle new file mode 100644 index 000000000..5e4fa4d00 --- /dev/null +++ b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/micronaut-amazon-awssdk-dynamodb-loader.gradle @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2018-2025 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +dependencies { + api project(':micronaut-amazon-awssdk-dynamodb') + implementation "com.opencsv:opencsv:$opencsvVersion" + implementation 'io.micronaut.reactor:micronaut-reactor' + implementation 'io.micronaut:micronaut-jackson-databind' + + testImplementation project(':micronaut-amazon-awssdk-integration-testing') + testImplementation 'com.agorapulse.testing:fixt:0.2.3' +} diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/main/java/com/agorapulse/amazon/awssdk/dynamodb/loader/CsvDynamoDbLoader.java b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/main/java/com/agorapulse/amazon/awssdk/dynamodb/loader/CsvDynamoDbLoader.java new file mode 100644 index 000000000..12629cec2 --- /dev/null +++ b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/main/java/com/agorapulse/amazon/awssdk/dynamodb/loader/CsvDynamoDbLoader.java @@ -0,0 +1,166 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2018-2025 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.amazon.awssdk.dynamodb.loader; + +import com.agorapulse.micronaut.amazon.awssdk.dynamodb.DynamoDBServiceProvider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.opencsv.CSVReaderHeaderAware; +import com.opencsv.exceptions.CsvValidationException; +import io.micronaut.core.beans.BeanIntrospection; +import io.micronaut.core.beans.BeanProperty; +import io.micronaut.core.convert.ConversionService; +import io.micronaut.core.util.StringUtils; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.ParallelFlux; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +import java.io.IOException; +import java.io.StringReader; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.UnaryOperator; + +@Singleton +@Named("csv") +public class CsvDynamoDbLoader implements DynamoDbLoader { + + private static final Logger LOGGER = LoggerFactory.getLogger(CsvDynamoDbLoader.class); + + private final Scheduler scheduler = Schedulers.boundedElastic(); + private final ConversionService conversionService; + private final DynamoDBServiceProvider dynamoDBServiceProvider; + private final ObjectMapper mapper; + + public CsvDynamoDbLoader(ConversionService conversionService, DynamoDBServiceProvider dynamoDBServiceProvider, ObjectMapper mapper) { + this.conversionService = conversionService; + this.dynamoDBServiceProvider = dynamoDBServiceProvider; + this.mapper = mapper; + } + + @Override + @SuppressWarnings("unchecked") + public Publisher load(UnaryOperator fileLoader, Map, Iterable> mappings) { + return (Publisher) Flux.fromIterable(mappings.entrySet()).parallel().runOn(scheduler).flatMap(entry -> + Flux.fromIterable(entry.getValue()).parallel().runOn(scheduler).flatMap(filename -> + preload(fileLoader, entry.getKey(), filename) + ) + ).sequential(); + } + + private Flux load(UnaryOperator fileLoader, Class type, String filename) { + BeanIntrospection introspection = BeanIntrospection.getIntrospection(type); + + long tick = System.currentTimeMillis(); + + String text = fileLoader.apply(filename); + + if (text == null) { + LOGGER.error("File {} not found", filename); + return Flux.empty(); + } + + try { + CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new StringReader(text)); + + LOGGER.info("Loading records from {} as {}", filename, type); + + Map> properties = new TreeMap<>(); + Set notFound = new HashSet<>(); + + return Flux.>generate(sink -> { + try { + Map data = reader.readMap(); + if (data == null) { + sink.complete(); + } else { + sink.next(data); + } + } catch (IOException | CsvValidationException e) { + sink.error(e); + } + }).map(data -> { + T insight = introspection.instantiate(); + + data.forEach((header, value) -> { + if (StringUtils.isEmpty(value)) { + return; + } + + if (notFound.contains(header)) { + return; + } + + BeanProperty property = properties.computeIfAbsent(header, h -> { + BeanProperty prop = introspection.getProperty(h).orElse(null); + if (prop == null) { + notFound.add(h); + LOGGER.warn("Property {} not found in {} for file {}", h, type, filename); + } + return prop; + }); + + if (property == null) { + return; + } + + Optional convertedValue; + if (value.startsWith("\"{\"\"")) { + String cleanUpJson = value.substring(1, value.length() - 1).replace("\"\"", "\""); + try { + convertedValue = Optional.of(mapper.readValue(cleanUpJson, property.getType())); + } catch (JsonProcessingException e) { + convertedValue = Optional.empty(); + LOGGER.error("Error parsing JSON: {} for property: {} of {}", cleanUpJson, property.getName(), type); + } + } else { + convertedValue = conversionService.convert(value, property.asArgument()); + } + + convertedValue.map(Object.class::cast).ifPresent(o -> property.set(insight, o)); + }); + + return insight; + }).doOnComplete(() -> { + LOGGER.info("Loaded {} records from {} as {} in {} ms", reader.getRecordsRead(), filename, type, System.currentTimeMillis() - tick); + try { + reader.close(); + } catch (IOException e) { + LOGGER.error("Error closing CSV: {}", e.getMessage()); + } + }); + } catch (IOException e) { + LOGGER.error("Error parsing CSV: {}", e.getMessage()); + return Flux.empty(); + } + } + + private ParallelFlux preload(UnaryOperator fileLoader, Class insightType, String fileName) { + return Flux.from(dynamoDBServiceProvider.findOrCreate(insightType).saveAll(load(fileLoader, insightType, fileName))).parallel().runOn(Schedulers.boundedElastic()); + } + +} diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/main/java/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoader.java b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/main/java/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoader.java new file mode 100644 index 000000000..455ce8a67 --- /dev/null +++ b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/main/java/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoader.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2018-2025 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.amazon.awssdk.dynamodb.loader; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import java.util.Map; +import java.util.function.UnaryOperator; + +/** + * Loads data into DynamoDB. + */ +public interface DynamoDbLoader { + + /** + * Loads data into DynamoDB and publishes the loaded items as they are loaded. + * + * @param fileLoader the file loader function that takes the file name and returns the content of the file + * @param mappings the mappings the map containing class as keys and file names as values + * @return the publisher of loaded items + */ + Publisher load(UnaryOperator fileLoader, Map, Iterable> mappings); + + /** + * Loads data into DynamoDB and waits for the completion. + * + * @param fileLoader the file loader function that takes the file name and returns the content of the file + * @param mappings the mappings the map containing class as keys and file names as values + */ + default void loadAll(UnaryOperator fileLoader, Map, Iterable> mappings) { + Flux.from(load(fileLoader, mappings)).blockLast(); + } + + + +} diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/java/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoaderTest.java b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/java/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoaderTest.java new file mode 100644 index 000000000..46455384b --- /dev/null +++ b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/java/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoaderTest.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2018-2025 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.amazon.awssdk.dynamodb.loader; + +import com.agorapulse.micronaut.amazon.awssdk.dynamodb.DynamoDBServiceProvider; +import com.agorapulse.testing.fixt.Fixt; +import io.micronaut.context.annotation.Property; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +@MicronautTest // <1> +@Property(name = "aws.dynamodb.create-tables", value = "true") // <2> +class DynamoDbLoaderTest { + + private static final Fixt fixt = Fixt.create(DynamoDbLoaderTest.class); // <3> + + @Inject DynamoDbLoader loader; + @Inject DynamoDBServiceProvider provider; + + @Test + void loadIntoDynamoDb() { + TestEntity referenceEntity = getReferenceEntity(); + + Object loadedValue = Flux.from( + loader.load( // <4> + fixt::readText, + Map.of( + TestEntity.class, List.of("test-entity.csv") // <5> + ) + ) + ).blockLast(); + + assertInstanceOf(TestEntity.class, loadedValue); + + assertEquals(referenceEntity, loadedValue); + + TestEntity fromDb = provider.findOrCreate(TestEntity.class).get("1", null); // <6> + assertEquals(referenceEntity, fromDb); + } + + private static TestEntity getReferenceEntity() { + TestEntity referenceEntity = new TestEntity(); + referenceEntity.setId("1"); + referenceEntity.setName("test-one"); + referenceEntity.setActive(true); + referenceEntity.setCreated(Instant.parse("2019-01-01T00:00:00Z")); + referenceEntity.setCount(2); + referenceEntity.setValue(3.4); + referenceEntity.setData(Map.of("string", "text")); + return referenceEntity; + } + +} diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/java/com/agorapulse/amazon/awssdk/dynamodb/loader/TestEntity.java b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/java/com/agorapulse/amazon/awssdk/dynamodb/loader/TestEntity.java new file mode 100644 index 000000000..23f1b825d --- /dev/null +++ b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/java/com/agorapulse/amazon/awssdk/dynamodb/loader/TestEntity.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2018-2025 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.amazon.awssdk.dynamodb.loader; + +import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.PartitionKey; +import io.micronaut.core.annotation.Introspected; + +import java.time.Instant; +import java.util.Map; +import java.util.Objects; + +@Introspected +public class TestEntity { + + @PartitionKey + private String id; + private String name; + private Instant created; + private boolean active; + private int count; + private double value; + private Map data; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Instant getCreated() { + return created; + } + + public void setCreated(Instant created) { + this.created = created; + } + + public boolean isActive() { + return active; + } + + public void setActive(boolean active) { + this.active = active; + } + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + public double getValue() { + return value; + } + + public void setValue(double value) { + this.value = value; + } + + public Map getData() { + return data; + } + + public void setData(Map data) { + this.data = data; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + TestEntity that = (TestEntity) o; + return active == that.active && count == that.count && Double.compare(value, that.value) == 0 && Objects.equals(id, that.id) && Objects.equals(name, that.name) && Objects.equals(created, that.created) && Objects.equals(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hash(id, name, created, active, count, value, data); + } + + @Override + public String toString() { + return "TestEntity{" + "id='" + id + '\'' + + ", name='" + name + '\'' + + ", created=" + created + + ", active=" + active + + ", count=" + count + + ", value=" + value + + ", data=" + data + + '}'; + } +} diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/resources/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoaderTest/test-entity.csv b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/resources/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoaderTest/test-entity.csv new file mode 100644 index 000000000..177fd7b25 --- /dev/null +++ b/subprojects/micronaut-amazon-awssdk-dynamodb-loader/src/test/resources/com/agorapulse/amazon/awssdk/dynamodb/loader/DynamoDbLoaderTest/test-entity.csv @@ -0,0 +1,2 @@ +"id","name","created","active","count","value","data" +"1","test-one","2019-01-01T00:00:00Z","true","2","3.4","""{""""string"""":""""text""""}"""