diff --git a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/AbstractCassandraOutputStream.java b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/AbstractCassandraOutputStream.java new file mode 100644 index 00000000..2f2e87a2 --- /dev/null +++ b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/AbstractCassandraOutputStream.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2020, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.powsybl.afs.cassandra; + +import com.datastax.driver.core.Session; + +import java.io.OutputStream; +import java.util.Objects; +import java.util.UUID; + +/** + * @author Yichen TANG + */ +abstract class AbstractCassandraOutputStream extends OutputStream { + + final UUID nodeUuid; + + final String name; + + final int chunkSize; + + final Session session; + + int chunkNum = 0; + + AbstractCassandraOutputStream(UUID nodeUuid, String name, int chunkSize, Session session) { + this.nodeUuid = Objects.requireNonNull(nodeUuid); + this.name = Objects.requireNonNull(name); + this.chunkSize = chunkSize; + this.session = Objects.requireNonNull(session); + } +} diff --git a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorage.java b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorage.java index f33e9fdd..dd0dcc33 100644 --- a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorage.java +++ b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorage.java @@ -840,6 +840,40 @@ private UUID deleteNode(UUID nodeUuid) { return parentNodeUuid; } + private final class DecompressOnceBinaryDataInputStream extends InputStream { + + private GZIPInputStream gzis; + + private DecompressOnceBinaryDataInputStream(UUID nodeUuid, String name, Row firstRow) { + Objects.requireNonNull(nodeUuid); + Objects.requireNonNull(name); + Objects.requireNonNull(firstRow); + int chunkNum = 0; + try { + ByteArrayOutputStream tmp = new ByteArrayOutputStream(); + Row row = firstRow; + while (row != null) { + tmp.write(row.getBytes(0).array()); + chunkNum++; + ResultSet resultSet = getSession().execute(select(CHUNK).from(NODE_DATA) + .where(eq(ID, nodeUuid)) + .and(eq(NAME, name)) + .and(eq(CHUNK_NUM, chunkNum))); + row = resultSet.one(); + } + ByteArrayInputStream bais = new ByteArrayInputStream(tmp.toByteArray()); + gzis = new GZIPInputStream(bais); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int read() throws IOException { + return gzis.read(); + } + } + private final class BinaryDataInputStream extends InputStream { private final UUID nodeUuid; @@ -855,6 +889,7 @@ private final class BinaryDataInputStream extends InputStream { private BinaryDataInputStream(UUID nodeUuid, String name, Row firstRow) { this.nodeUuid = Objects.requireNonNull(nodeUuid); this.name = Objects.requireNonNull(name); + Objects.requireNonNull(firstRow); buffer = new ByteArrayInputStream(firstRow.getBytes(0).array()); try { gzis = new GZIPInputStream(buffer); @@ -932,26 +967,44 @@ public Optional readBinaryData(String nodeId, String name) { return Optional.empty(); } - return Optional.of(new BinaryDataInputStream(nodeUuid, name, firstRow)); + ResultSet resultSet2 = getSession().execute(select(CHUNK).from(NODE_DATA) + .where(eq(ID, nodeUuid)) + .and(eq(NAME, name)) + .and(eq(CHUNK_NUM, 1))); + if (resultSet2 == null) { + return Optional.of(new BinaryDataInputStream(nodeUuid, name, firstRow)); + } + final Row row = resultSet2.one(); + if (row == null) { + return Optional.of(new BinaryDataInputStream(nodeUuid, name, firstRow)); + } + final CompressionMode compressionMode = detectedBySecondChunk(row.getBytes(0).array()); + if (CompressionMode.ONCE.equals(compressionMode)) { + return Optional.of(new DecompressOnceBinaryDataInputStream(nodeUuid, name, firstRow)); + } else if (CompressionMode.MULTI.equals(compressionMode)) { + return Optional.of(new BinaryDataInputStream(nodeUuid, name, firstRow)); + } else { + throw new IllegalArgumentException(compressionMode + " is not supported"); + } } - private final class BinaryDataOutputStream extends OutputStream { - - private final UUID nodeUuid; + private static CompressionMode detectedBySecondChunk(byte[] header) { + if (header[0] == (byte) 35615 && header[1] == (byte) (35615 >> 8) && header[2] == 8) { + return CompressionMode.MULTI; + } + return CompressionMode.ONCE; + } - private final String name; + private final class BinaryDataOutputStream extends AbstractCassandraOutputStream { private ByteArrayOutputStream buffer = new ByteArrayOutputStream(config.getBinaryDataChunkSize()); private long count = 0; - private int chunkNum = 0; - private GZIPOutputStream gzos; private BinaryDataOutputStream(UUID nodeUuid, String name) { - this.nodeUuid = Objects.requireNonNull(nodeUuid); - this.name = Objects.requireNonNull(name); + super(nodeUuid, name, config.getBinaryDataChunkSize(), getSession()); try { gzos = new GZIPOutputStream(buffer); } catch (IOException e) { @@ -1008,7 +1061,7 @@ public void write(byte[] b, int off, int len) throws IOException { @Override public void close() { - if (chunkNum == 0 || count > 0) { // create at least on chunk even empty + if (chunkNum == 0 || count > 0) { // create at least one chunk even empty execute(); } @@ -1022,12 +1075,23 @@ public void close() { @Override public OutputStream writeBinaryData(String nodeId, String name) { + return writeBinaryData(nodeId, name, CompressionMode.MULTI); + } + + @Override + public OutputStream writeBinaryData(String nodeId, String name, CompressionMode mode) { UUID nodeUuid = checkNodeId(nodeId); Objects.requireNonNull(name); // flush buffer to keep change order changeBuffer.flush(); pushEvent(new NodeDataUpdated(nodeId, name), APPSTORAGE_NODE_TOPIC); - return new BinaryDataOutputStream(nodeUuid, name); + if (CompressionMode.ONCE.equals(mode)) { + return new CompressOnceBinaryDataOutputStream(nodeUuid, name, config.getBinaryDataChunkSize(), getSession()); + } else if (CompressionMode.MULTI.equals(mode)) { + return new BinaryDataOutputStream(nodeUuid, name); + } else { + throw new IllegalArgumentException(mode + " is not supported"); + } } @Override diff --git a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CompressOnceBinaryDataOutputStream.java b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CompressOnceBinaryDataOutputStream.java new file mode 100644 index 00000000..67967ab4 --- /dev/null +++ b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CompressOnceBinaryDataOutputStream.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2020, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.powsybl.afs.cassandra; + +import com.datastax.driver.core.Session; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.zip.GZIPOutputStream; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.powsybl.afs.cassandra.CassandraConstants.*; + +/** + * @author Yichen TANG + */ +final class CompressOnceBinaryDataOutputStream extends AbstractCassandraOutputStream { + + private final ByteArrayOutputStream baos; + private final GZIPOutputStream gzos; + + CompressOnceBinaryDataOutputStream(UUID nodeUuid, String name, int chunkSize, Session session) { + super(nodeUuid, name, chunkSize, session); + baos = new ByteArrayOutputStream(); + try { + gzos = new GZIPOutputStream(baos); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void write(int b) throws IOException { + gzos.write(b); + executeIfNecessary(); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + gzos.write(b, off, len); + executeIfNecessary(); + } + + private void executeIfNecessary() { + final int length = baos.size(); + if (length > (chunkNum + 1) * chunkSize) { + writeToDb(); + } + } + + private void writeToDb() { + final byte[] bytes = baos.toByteArray(); + for (int i = chunkNum; i < bytes.length / chunkSize; i++) { + int offset = i * chunkSize; + session.execute(insertInto(NODE_DATA) + .value(ID, nodeUuid) + .value(NAME, name) + .value(CHUNK_NUM, chunkNum++) + .value(CHUNKS_COUNT, chunkNum) + .value(CHUNK, ByteBuffer.wrap(bytes, offset, chunkSize))); + } + } + + @Override + public void close() throws IOException { + gzos.close(); + writeToDb(); + final byte[] bytes = baos.toByteArray(); + final int remainder = bytes.length % chunkSize; + if (remainder != 0) { + session.execute(insertInto(NODE_DATA) + .value(ID, nodeUuid) + .value(NAME, name) + .value(CHUNK_NUM, chunkNum++) + .value(CHUNKS_COUNT, chunkNum) + .value(CHUNK, ByteBuffer.wrap(bytes, bytes.length - remainder, remainder))); + } + } +} diff --git a/afs-cassandra/src/test/java/com/powsybl/afs/cassandra/CassandraDataSplitTest.java b/afs-cassandra/src/test/java/com/powsybl/afs/cassandra/CassandraDataSplitTest.java index 7f25d675..0941ffe9 100644 --- a/afs-cassandra/src/test/java/com/powsybl/afs/cassandra/CassandraDataSplitTest.java +++ b/afs-cassandra/src/test/java/com/powsybl/afs/cassandra/CassandraDataSplitTest.java @@ -7,6 +7,7 @@ package com.powsybl.afs.cassandra; import com.google.common.io.ByteStreams; +import com.powsybl.afs.storage.AppStorage; import com.powsybl.afs.storage.InMemoryEventsBus; import com.powsybl.afs.storage.NodeGenericMetadata; import com.powsybl.afs.storage.NodeInfo; @@ -66,5 +67,19 @@ public void test() throws IOException { assertTrue(storage.removeData(nodeInfo.getId(), "a")); assertTrue(storage.getDataNames(nodeInfo.getId()).isEmpty()); assertFalse(storage.readBinaryData(nodeInfo.getId(), "a").isPresent()); + + String data = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddddddddddeeeeeeeeeeeeeeeeeeeeefffffffffffffffffffffffff"; + NodeInfo rootNodeId2 = storage.createRootNodeIfNotExists("test", "folder"); + NodeInfo nodeInfo2 = storage.createNode(rootNodeId2.getId(), "test1", "folder", "", 0, new NodeGenericMetadata()); + try (OutputStream os = storage.writeBinaryData(nodeInfo2.getId(), "gzipOnce", AppStorage.CompressionMode.ONCE)) { + os.write(data.getBytes(StandardCharsets.UTF_8)); + } + storage.flush(); + + InputStream is3 = storage.readBinaryData(nodeInfo2.getId(), "gzipOnce").orElse(null); + assertNotNull(is3); + assertEquals(data, new String(ByteStreams.toByteArray(is3), StandardCharsets.UTF_8)); + assertTrue(storage.removeData(nodeInfo2.getId(), "gzipOnce")); + assertTrue(storage.getDataNames(nodeInfo2.getId()).isEmpty()); } } diff --git a/afs-ext-base/src/main/java/com/powsybl/afs/ext/base/ImportedCaseBuilder.java b/afs-ext-base/src/main/java/com/powsybl/afs/ext/base/ImportedCaseBuilder.java index c1dca707..2560d525 100644 --- a/afs-ext-base/src/main/java/com/powsybl/afs/ext/base/ImportedCaseBuilder.java +++ b/afs-ext-base/src/main/java/com/powsybl/afs/ext/base/ImportedCaseBuilder.java @@ -56,6 +56,8 @@ public class ImportedCaseBuilder implements ProjectFileBuilder { private Importer importer; + private AppStorage.CompressionMode mode = AppStorage.CompressionMode.MULTI; + private final Properties parameters = new Properties(); public ImportedCaseBuilder(ProjectFileBuildContext context, ImportersLoader importersLoader, ImportConfig importConfig) { @@ -122,6 +124,11 @@ public ImportedCaseBuilder withParameters(Map parameters) { return this; } + public ImportedCaseBuilder setCompressMode(AppStorage.CompressionMode mode) { + this.mode = Objects.requireNonNull(mode); + return this; + } + @Override public ImportedCase build() { if (dataSource == null) { @@ -140,7 +147,7 @@ public ImportedCase build() { new NodeGenericMetadata().setString(ImportedCase.FORMAT, importer.getFormat())); // store case data - importer.copy(dataSource, new AppStorageDataSource(context.getStorage(), info.getId(), info.getName())); + importer.copy(dataSource, new AppStorageDataSource(context.getStorage(), info.getId(), info.getName(), mode)); // store parameters try (Writer writer = new OutputStreamWriter(context.getStorage().writeBinaryData(info.getId(), ImportedCase.PARAMETERS), StandardCharsets.UTF_8)) { diff --git a/afs-storage-api/src/main/java/com/powsybl/afs/storage/AppStorage.java b/afs-storage-api/src/main/java/com/powsybl/afs/storage/AppStorage.java index c8618e13..ad4a572a 100644 --- a/afs-storage-api/src/main/java/com/powsybl/afs/storage/AppStorage.java +++ b/afs-storage-api/src/main/java/com/powsybl/afs/storage/AppStorage.java @@ -7,7 +7,9 @@ package com.powsybl.afs.storage; import com.powsybl.commons.PowsyblException; -import com.powsybl.timeseries.*; +import com.powsybl.timeseries.DoubleDataChunk; +import com.powsybl.timeseries.StringDataChunk; +import com.powsybl.timeseries.TimeSeriesMetadata; import java.io.InputStream; import java.io.OutputStream; @@ -127,6 +129,14 @@ default void renameNode(String nodeId, String name) { */ OutputStream writeBinaryData(String nodeId, String name); + /** + * Returns an {@code OutputStream} to write data associated to the node with ID {@code nodeId}. + * The underling stored data bytes will be compressed depending on user chose {@code mode} + */ + default OutputStream writeBinaryData(String nodeId, String name, CompressionMode mode) { + return writeBinaryData(nodeId, name); + } + /** * Returns {@code true} if data named {@code name} associated with the node with ID {@code nodeId} exists. */ @@ -243,4 +253,15 @@ default void renameNode(String nodeId, String name) { */ @Override void close(); + + enum CompressionMode { + /** + * The input data will be gzipped first, and every chunk contains a part of gzipped bytes. + */ + ONCE, + /** + * The input data will be chunked first, then each chunk data will be gzipped and stored. + */ + MULTI + } } diff --git a/afs-storage-api/src/main/java/com/powsybl/afs/storage/AppStorageDataSource.java b/afs-storage-api/src/main/java/com/powsybl/afs/storage/AppStorageDataSource.java index 836c351b..4fb33c2a 100755 --- a/afs-storage-api/src/main/java/com/powsybl/afs/storage/AppStorageDataSource.java +++ b/afs-storage-api/src/main/java/com/powsybl/afs/storage/AppStorageDataSource.java @@ -6,6 +6,10 @@ */ package com.powsybl.afs.storage; +import com.powsybl.commons.datasource.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -15,11 +19,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.powsybl.commons.datasource.DataSource; - /** * A datasource corresponding to a data blob stored in the file system. * A data blob is associated to a node and a name identifying it among data blobs of this node. @@ -30,6 +29,10 @@ public class AppStorageDataSource implements DataSource { private static final String SEPARATOR = "__"; + private static final AppStorage.CompressionMode DEFAULT_COMPRESS_MODE = AppStorage.CompressionMode.MULTI; + + private final AppStorage.CompressionMode compressMode; + public interface Name { static Name parse(String text) { @@ -132,9 +135,14 @@ public String toString() { private final String nodeName; public AppStorageDataSource(AppStorage storage, String nodeId, String nodeName) { + this(storage, nodeId, nodeName, DEFAULT_COMPRESS_MODE); + } + + public AppStorageDataSource(AppStorage storage, String nodeId, String nodeName, AppStorage.CompressionMode mode) { this.storage = Objects.requireNonNull(storage); this.nodeId = Objects.requireNonNull(nodeId); this.nodeName = Objects.requireNonNull(nodeName); + this.compressMode = Objects.requireNonNull(mode); } @Override @@ -147,7 +155,7 @@ public OutputStream newOutputStream(final String suffix, final String ext, boole if (append) { throw new UnsupportedOperationException("Append mode not supported"); } - return storage.writeBinaryData(nodeId, new SuffixAndExtension(suffix, ext).toString()); + return storage.writeBinaryData(nodeId, new SuffixAndExtension(suffix, ext).toString(), compressMode); } @Override @@ -156,7 +164,7 @@ public OutputStream newOutputStream(String fileName, boolean append) { if (append) { throw new UnsupportedOperationException("Append mode not supported"); } - return storage.writeBinaryData(nodeId, new FileName(fileName).toString()); + return storage.writeBinaryData(nodeId, new FileName(fileName).toString(), compressMode); } @Override