diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
index eb1e9afc3..90b980b98 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
@@ -187,6 +187,7 @@ public static class Header {
public static final int COMPRESSION_CODEC_MASK = 0x3;
public static final int COMPRESSION_CODEC_NONE = 0x0;
public static final int COMPRESSION_CODEC_LZ4 = 0x1;
+ public static final int COMPRESSION_CODEC_ZSTD = 0x2;
private int flags = 0;
private int decompressedSize = 0;
@@ -213,6 +214,10 @@ public Header(CompressionCodec.Type compressionType,
this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
COMPRESSION_CODEC_LZ4);
break;
+ case ZSTD:
+ this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
+ COMPRESSION_CODEC_ZSTD);
+ break;
default:
throw new RuntimeException(String.format("Unknown Compression Type: %s",
compressionType));
@@ -233,6 +238,8 @@ private void read(DataInputStream in) throws IOException {
this.compressionType = CompressionCodec.Type.NONE;
} else if (compressionType == COMPRESSION_CODEC_LZ4) {
this.compressionType = CompressionCodec.Type.LZ4;
+ } else if (compressionType == COMPRESSION_CODEC_ZSTD) {
+ this.compressionType = CompressionCodec.Type.ZSTD;
} else {
throw new IOException(String.format("Unsupported Compression Type: %s",
compressionType));
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java
index 6d78f0370..07377d108 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java
@@ -63,9 +63,18 @@ public void testEnvelope() throws Exception {
@Test(timeout = 20000)
public void testLZ4Compression() throws Exception {
+ testCompression(CompressionCodec.Type.LZ4);
+ }
+
+ @Test(timeout = 20000)
+ public void testZstdCompression() throws Exception {
+ testCompression(CompressionCodec.Type.ZSTD);
+ }
+
+ private void testCompression(CompressionCodec.Type codec) throws Exception {
byte[] data = getString(true).getBytes();
EnvelopedEntry writeEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
- CompressionCodec.Type.LZ4,
+ codec,
data,
data.length,
new NullStatsLogger());
diff --git a/distributedlog-protocol/pom.xml b/distributedlog-protocol/pom.xml
index 5e554826b..824ea1445 100644
--- a/distributedlog-protocol/pom.xml
+++ b/distributedlog-protocol/pom.xml
@@ -77,6 +77,11 @@
lz4
1.2.0
+
+ com.github.luben
+ zstd-jni
+ 1.1.2
+
junit
junit
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
index 1648c6d6b..17f3f7e0c 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
@@ -18,6 +18,7 @@
package org.apache.distributedlog;
import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_LZ4;
+import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_ZSTD;
import static org.apache.distributedlog.LogRecordSet.METADATA_COMPRESSION_MASK;
import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
import static org.apache.distributedlog.LogRecordSet.NULL_OP_STATS_LOGGER;
@@ -80,7 +81,12 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader {
if (COMPRESSION_CODEC_LZ4 == codecCode) {
CompressionCodec codec = CompressionUtils.getCompressionCodec(CompressionCodec.Type.LZ4);
byte[] decompressedData = codec.decompress(compressedData, 0, actualDataLen,
- originDataLen, NULL_OP_STATS_LOGGER);
+ originDataLen, NULL_OP_STATS_LOGGER);
+ this.reader = ByteBuffer.wrap(decompressedData);
+ } else if (COMPRESSION_CODEC_ZSTD == codecCode) {
+ CompressionCodec codec = CompressionUtils.getCompressionCodec(CompressionCodec.Type.ZSTD);
+ byte[] decompressedData = codec.decompress(compressedData, 0, actualDataLen,
+ originDataLen, NULL_OP_STATS_LOGGER);
this.reader = ByteBuffer.wrap(decompressedData);
} else {
if (originDataLen != actualDataLen) {
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
index 9d2d7a760..2600ebb90 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
@@ -20,6 +20,7 @@
import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_LZ4;
import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_NONE;
+import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_ZSTD;
import static org.apache.distributedlog.LogRecordSet.HEADER_LEN;
import static org.apache.distributedlog.LogRecordSet.METADATA_COMPRESSION_MASK;
import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
@@ -67,6 +68,9 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
case LZ4:
this.codecCode = COMPRESSION_CODEC_LZ4;
break;
+ case ZSTD:
+ this.codecCode = COMPRESSION_CODEC_ZSTD;
+ break;
default:
this.codecCode = COMPRESSION_CODEC_NONE;
break;
@@ -148,7 +152,7 @@ ByteBuffer createBuffer() {
int dataOffset = HEADER_LEN;
int dataLen = buffer.size() - HEADER_LEN;
- if (COMPRESSION_CODEC_LZ4 != codecCode) {
+ if (COMPRESSION_CODEC_NONE == codecCode) {
ByteBuffer recordSetBuffer = ByteBuffer.wrap(data, 0, buffer.size());
// update count
recordSetBuffer.putInt(4, count);
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
index 375ed3f71..480bf09fb 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
@@ -76,6 +76,7 @@ public class LogRecordSet {
// Compression Codec
static final int COMPRESSION_CODEC_NONE = 0x0;
static final int COMPRESSION_CODEC_LZ4 = 0X1;
+ static final int COMPRESSION_CODEC_ZSTD = 0x2;
public static int numRecords(LogRecord record) throws IOException {
checkArgument(record.isRecordSet(),
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java
index 9a0e3a3e4..997d7e446 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java
@@ -27,7 +27,7 @@ public interface CompressionCodec {
* Enum specifying the currently supported compression types.
*/
enum Type {
- NONE, LZ4, UNKNOWN
+ NONE, LZ4, ZSTD, UNKNOWN
}
/**
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java
index 7bac92abb..234872b17 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java
@@ -24,9 +24,11 @@ public class CompressionUtils {
public static final String LZ4 = "lz4";
public static final String NONE = "none";
+ public static final String ZSTD = "zstd";
private static final CompressionCodec IDENTITY_CODEC = new IdentityCompressionCodec();
private static final CompressionCodec LZ4_CODEC = new LZ4CompressionCodec();
+ private static final CompressionCodec ZSTD_CODEC = new ZstdCompressionCodec();
/**
* Get a cached compression codec instance for the specified type.
@@ -36,6 +38,8 @@ public class CompressionUtils {
public static CompressionCodec getCompressionCodec(CompressionCodec.Type type) {
if (type == CompressionCodec.Type.LZ4) {
return LZ4_CODEC;
+ } else if (type == CompressionCodec.Type.ZSTD) {
+ return ZSTD_CODEC;
}
// No Compression
return IDENTITY_CODEC;
@@ -49,6 +53,8 @@ public static CompressionCodec getCompressionCodec(CompressionCodec.Type type) {
public static CompressionCodec.Type stringToType(String compressionString) {
if (compressionString.equals(LZ4)) {
return CompressionCodec.Type.LZ4;
+ } else if (compressionString.equals(ZSTD)) {
+ return CompressionCodec.Type.ZSTD;
} else if (compressionString.equals(NONE)) {
return CompressionCodec.Type.NONE;
} else {
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/ZstdCompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/ZstdCompressionCodec.java
new file mode 100644
index 000000000..59b311842
--- /dev/null
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/ZstdCompressionCodec.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.io;
+
+import com.github.luben.zstd.Zstd;
+import com.google.common.base.Stopwatch;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+/**
+ * ZStandard Compression Codec.
+ *
+ * {@link http://facebook.github.io/zstd/}
+ */
+public class ZstdCompressionCodec implements CompressionCodec {
+
+ public ZstdCompressionCodec() {}
+
+ byte[] getOrCopyData(byte[] data, int offset, int length) {
+ byte[] copyOfData;
+ if (0 == offset && length == data.length) {
+ copyOfData = data;
+ } else {
+ copyOfData = Arrays.copyOfRange(data, offset, offset + length);
+ }
+ return copyOfData;
+ }
+
+ @Override
+ public byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat) {
+ Stopwatch watch = Stopwatch.createStarted();
+ // TODO: make the compression level configurable.
+ byte[] compressed = Zstd.compress(getOrCopyData(data, offset, length), 3);
+
+ compressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS));
+ return compressed;
+ }
+
+ @Override
+ public byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat) {
+ Stopwatch watch = Stopwatch.createStarted();
+ byte[] compressedData = getOrCopyData(data, offset, length);
+ int decompressedSize = (int) Zstd.decompressedSize(compressedData);
+ byte[] decompressedData = new byte[decompressedSize];
+ // TODO: make the compression level configurable.
+ int actualDecompressedSize = (int) Zstd.decompress(decompressedData, compressedData);
+ if (actualDecompressedSize != decompressedSize) {
+ decompressedData = getOrCopyData(decompressedData, 0, actualDecompressedSize);
+ }
+ decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS));
+ return decompressedData;
+ }
+
+ @Override
+ public byte[] decompress(byte[] data, int offset, int length, int decompressedSize,
+ OpStatsLogger decompressionStat) {
+ Stopwatch watch = Stopwatch.createStarted();
+ // TODO: make the compression level configurable.
+ byte[] decompressed = Zstd.decompress(getOrCopyData(data, offset, length), decompressedSize);
+ decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS));
+ return decompressed;
+ }
+}
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
index 95e03abe2..6099c7dd9 100644
--- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
+++ b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
@@ -106,6 +106,11 @@ public void testWriteRecordsLZ4Compressed() throws Exception {
testWriteRecords(Type.LZ4);
}
+ @Test(timeout = 20000)
+ public void testWriteRecordsZstdCompressed() throws Exception {
+ testWriteRecords(Type.ZSTD);
+ }
+
void testWriteRecords(Type codec) throws Exception {
Writer writer = LogRecordSet.newWriter(1024, codec);
assertEquals("zero user bytes", HEADER_LEN, writer.getNumBytes());