Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] A new interface method with an option to choose compress mode when wr… #61

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2020, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.powsybl.afs.cassandra;

import com.datastax.driver.core.Session;

import java.io.OutputStream;
import java.util.Objects;
import java.util.UUID;

/**
* @author Yichen TANG <yichen.tang at rte-france.com>
*/
abstract class AbstractCassandraOutputStream extends OutputStream {

final UUID nodeUuid;

final String name;

final int chunkSize;

final Session session;

int chunkNum = 0;

AbstractCassandraOutputStream(UUID nodeUuid, String name, int chunkSize, Session session) {
this.nodeUuid = Objects.requireNonNull(nodeUuid);
this.name = Objects.requireNonNull(name);
this.chunkSize = chunkSize;
this.session = Objects.requireNonNull(session);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,40 @@ private UUID deleteNode(UUID nodeUuid) {
return parentNodeUuid;
}

private final class DecompressOnceBinaryDataInputStream extends InputStream {

private GZIPInputStream gzis;

private DecompressOnceBinaryDataInputStream(UUID nodeUuid, String name, Row firstRow) {
Objects.requireNonNull(nodeUuid);
Objects.requireNonNull(name);
Objects.requireNonNull(firstRow);
int chunkNum = 0;
try {
ByteArrayOutputStream tmp = new ByteArrayOutputStream();
Row row = firstRow;
while (row != null) {
tmp.write(row.getBytes(0).array());
chunkNum++;
ResultSet resultSet = getSession().execute(select(CHUNK).from(NODE_DATA)
.where(eq(ID, nodeUuid))
.and(eq(NAME, name))
.and(eq(CHUNK_NUM, chunkNum)));
row = resultSet.one();
}
ByteArrayInputStream bais = new ByteArrayInputStream(tmp.toByteArray());
gzis = new GZIPInputStream(bais);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public int read() throws IOException {
return gzis.read();
}
}

private final class BinaryDataInputStream extends InputStream {

private final UUID nodeUuid;
Expand All @@ -855,6 +889,7 @@ private final class BinaryDataInputStream extends InputStream {
private BinaryDataInputStream(UUID nodeUuid, String name, Row firstRow) {
this.nodeUuid = Objects.requireNonNull(nodeUuid);
this.name = Objects.requireNonNull(name);
Objects.requireNonNull(firstRow);
buffer = new ByteArrayInputStream(firstRow.getBytes(0).array());
try {
gzis = new GZIPInputStream(buffer);
Expand Down Expand Up @@ -932,26 +967,44 @@ public Optional<InputStream> readBinaryData(String nodeId, String name) {
return Optional.empty();
}

return Optional.of(new BinaryDataInputStream(nodeUuid, name, firstRow));
ResultSet resultSet2 = getSession().execute(select(CHUNK).from(NODE_DATA)
.where(eq(ID, nodeUuid))
.and(eq(NAME, name))
.and(eq(CHUNK_NUM, 1)));
if (resultSet2 == null) {
return Optional.of(new BinaryDataInputStream(nodeUuid, name, firstRow));
}
final Row row = resultSet2.one();
if (row == null) {
return Optional.of(new BinaryDataInputStream(nodeUuid, name, firstRow));
}
final CompressionMode compressionMode = detectedBySecondChunk(row.getBytes(0).array());
if (CompressionMode.ONCE.equals(compressionMode)) {
return Optional.of(new DecompressOnceBinaryDataInputStream(nodeUuid, name, firstRow));
} else if (CompressionMode.MULTI.equals(compressionMode)) {
return Optional.of(new BinaryDataInputStream(nodeUuid, name, firstRow));
} else {
throw new IllegalArgumentException(compressionMode + " is not supported");
}
}

private final class BinaryDataOutputStream extends OutputStream {

private final UUID nodeUuid;
private static CompressionMode detectedBySecondChunk(byte[] header) {
if (header[0] == (byte) 35615 && header[1] == (byte) (35615 >> 8) && header[2] == 8) {
return CompressionMode.MULTI;
}
return CompressionMode.ONCE;
}

private final String name;
private final class BinaryDataOutputStream extends AbstractCassandraOutputStream {

private ByteArrayOutputStream buffer = new ByteArrayOutputStream(config.getBinaryDataChunkSize());

private long count = 0;

private int chunkNum = 0;

private GZIPOutputStream gzos;

private BinaryDataOutputStream(UUID nodeUuid, String name) {
this.nodeUuid = Objects.requireNonNull(nodeUuid);
this.name = Objects.requireNonNull(name);
super(nodeUuid, name, config.getBinaryDataChunkSize(), getSession());
try {
gzos = new GZIPOutputStream(buffer);
} catch (IOException e) {
Expand Down Expand Up @@ -1008,7 +1061,7 @@ public void write(byte[] b, int off, int len) throws IOException {

@Override
public void close() {
if (chunkNum == 0 || count > 0) { // create at least on chunk even empty
if (chunkNum == 0 || count > 0) { // create at least one chunk even empty
execute();
}

Expand All @@ -1022,12 +1075,23 @@ public void close() {

@Override
public OutputStream writeBinaryData(String nodeId, String name) {
return writeBinaryData(nodeId, name, CompressionMode.MULTI);
}

@Override
public OutputStream writeBinaryData(String nodeId, String name, CompressionMode mode) {
UUID nodeUuid = checkNodeId(nodeId);
Objects.requireNonNull(name);
// flush buffer to keep change order
changeBuffer.flush();
pushEvent(new NodeDataUpdated(nodeId, name), APPSTORAGE_NODE_TOPIC);
return new BinaryDataOutputStream(nodeUuid, name);
if (CompressionMode.ONCE.equals(mode)) {
return new CompressOnceBinaryDataOutputStream(nodeUuid, name, config.getBinaryDataChunkSize(), getSession());
} else if (CompressionMode.MULTI.equals(mode)) {
return new BinaryDataOutputStream(nodeUuid, name);
} else {
throw new IllegalArgumentException(mode + " is not supported");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright (c) 2020, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.powsybl.afs.cassandra;

import com.datastax.driver.core.Session;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.zip.GZIPOutputStream;

import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.powsybl.afs.cassandra.CassandraConstants.*;

/**
* @author Yichen TANG <yichen.tang at rte-france.com>
*/
final class CompressOnceBinaryDataOutputStream extends AbstractCassandraOutputStream {

private final ByteArrayOutputStream baos;
private final GZIPOutputStream gzos;

CompressOnceBinaryDataOutputStream(UUID nodeUuid, String name, int chunkSize, Session session) {
super(nodeUuid, name, chunkSize, session);
baos = new ByteArrayOutputStream();
try {
gzos = new GZIPOutputStream(baos);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void write(int b) throws IOException {
gzos.write(b);
executeIfNecessary();
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
gzos.write(b, off, len);
executeIfNecessary();
}

private void executeIfNecessary() {
final int length = baos.size();
if (length > (chunkNum + 1) * chunkSize) {
writeToDb();
}
}

private void writeToDb() {
final byte[] bytes = baos.toByteArray();
for (int i = chunkNum; i < bytes.length / chunkSize; i++) {
int offset = i * chunkSize;
session.execute(insertInto(NODE_DATA)
.value(ID, nodeUuid)
.value(NAME, name)
.value(CHUNK_NUM, chunkNum++)
.value(CHUNKS_COUNT, chunkNum)
.value(CHUNK, ByteBuffer.wrap(bytes, offset, chunkSize)));
}
}

@Override
public void close() throws IOException {
gzos.close();
writeToDb();
final byte[] bytes = baos.toByteArray();
final int remainder = bytes.length % chunkSize;
if (remainder != 0) {
session.execute(insertInto(NODE_DATA)
.value(ID, nodeUuid)
.value(NAME, name)
.value(CHUNK_NUM, chunkNum++)
.value(CHUNKS_COUNT, chunkNum)
.value(CHUNK, ByteBuffer.wrap(bytes, bytes.length - remainder, remainder)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.powsybl.afs.cassandra;

import com.google.common.io.ByteStreams;
import com.powsybl.afs.storage.AppStorage;
import com.powsybl.afs.storage.InMemoryEventsBus;
import com.powsybl.afs.storage.NodeGenericMetadata;
import com.powsybl.afs.storage.NodeInfo;
Expand Down Expand Up @@ -66,5 +67,19 @@ public void test() throws IOException {
assertTrue(storage.removeData(nodeInfo.getId(), "a"));
assertTrue(storage.getDataNames(nodeInfo.getId()).isEmpty());
assertFalse(storage.readBinaryData(nodeInfo.getId(), "a").isPresent());

String data = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddddddddddeeeeeeeeeeeeeeeeeeeeefffffffffffffffffffffffff";
NodeInfo rootNodeId2 = storage.createRootNodeIfNotExists("test", "folder");
NodeInfo nodeInfo2 = storage.createNode(rootNodeId2.getId(), "test1", "folder", "", 0, new NodeGenericMetadata());
try (OutputStream os = storage.writeBinaryData(nodeInfo2.getId(), "gzipOnce", AppStorage.CompressionMode.ONCE)) {
os.write(data.getBytes(StandardCharsets.UTF_8));
}
storage.flush();

InputStream is3 = storage.readBinaryData(nodeInfo2.getId(), "gzipOnce").orElse(null);
assertNotNull(is3);
assertEquals(data, new String(ByteStreams.toByteArray(is3), StandardCharsets.UTF_8));
assertTrue(storage.removeData(nodeInfo2.getId(), "gzipOnce"));
assertTrue(storage.getDataNames(nodeInfo2.getId()).isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class ImportedCaseBuilder implements ProjectFileBuilder<ImportedCase> {

private Importer importer;

private AppStorage.CompressionMode mode = AppStorage.CompressionMode.MULTI;

private final Properties parameters = new Properties();

public ImportedCaseBuilder(ProjectFileBuildContext context, ImportersLoader importersLoader, ImportConfig importConfig) {
Expand Down Expand Up @@ -122,6 +124,11 @@ public ImportedCaseBuilder withParameters(Map<String, String> parameters) {
return this;
}

public ImportedCaseBuilder setCompressMode(AppStorage.CompressionMode mode) {
this.mode = Objects.requireNonNull(mode);
return this;
}

@Override
public ImportedCase build() {
if (dataSource == null) {
Expand All @@ -140,7 +147,7 @@ public ImportedCase build() {
new NodeGenericMetadata().setString(ImportedCase.FORMAT, importer.getFormat()));

// store case data
importer.copy(dataSource, new AppStorageDataSource(context.getStorage(), info.getId(), info.getName()));
importer.copy(dataSource, new AppStorageDataSource(context.getStorage(), info.getId(), info.getName(), mode));

// store parameters
try (Writer writer = new OutputStreamWriter(context.getStorage().writeBinaryData(info.getId(), ImportedCase.PARAMETERS), StandardCharsets.UTF_8)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
package com.powsybl.afs.storage;

import com.powsybl.commons.PowsyblException;
import com.powsybl.timeseries.*;
import com.powsybl.timeseries.DoubleDataChunk;
import com.powsybl.timeseries.StringDataChunk;
import com.powsybl.timeseries.TimeSeriesMetadata;

import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -127,6 +129,14 @@ default void renameNode(String nodeId, String name) {
*/
OutputStream writeBinaryData(String nodeId, String name);

/**
* Returns an {@code OutputStream} to write data associated to the node with ID {@code nodeId}.
* The underling stored data bytes will be compressed depending on user chose {@code mode}
*/
default OutputStream writeBinaryData(String nodeId, String name, CompressionMode mode) {
return writeBinaryData(nodeId, name);
}

/**
* Returns {@code true} if data named {@code name} associated with the node with ID {@code nodeId} exists.
*/
Expand Down Expand Up @@ -243,4 +253,15 @@ default void renameNode(String nodeId, String name) {
*/
@Override
void close();

enum CompressionMode {
/**
* The input data will be gzipped first, and every chunk contains a part of gzipped bytes.
*/
ONCE,
/**
* The input data will be chunked first, then each chunk data will be gzipped and stored.
*/
MULTI
}
}
Loading