Skip to content

Commit

Permalink
[INLONG-11018][SDK] Transform SQL supports COMPRESS and UNCOMPRESS Fu…
Browse files Browse the repository at this point in the history
…nctions (#11048)
  • Loading branch information
Zkplo authored Sep 14, 2024
1 parent 0aae16f commit af9acab
Show file tree
Hide file tree
Showing 14 changed files with 770 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.function.factory.CompressionAlgorithmFactory;
import org.apache.inlong.sdk.transform.process.function.handler.CompressHandler;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* CompressFunction
* description: compress(string_to_compress[,compress_type(default:deflater)])
* - Return NULL if string_to_compress is NULL.
* - Return "" if string_to_compress is "".
* - Return the result as a binary string.
*/
@Slf4j
@TransformFunction(names = {"compress"})
public class CompressFunction implements ValueParser {

private final ValueParser stringParser;
private final ValueParser compressTypeParser;

// ISO-8859-1 encoding does not convert negative numbers and can preserve the original values of byte arrays.
private final Charset CHARSET = StandardCharsets.ISO_8859_1;
private final String DEFAULT_COMPRESS_TYPE = "deflater";

public CompressFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
stringParser = OperatorTools.buildParser(expressions.get(0));
if (expressions.size() == 2) {
compressTypeParser = OperatorTools.buildParser(expressions.get(1));
} else {
compressTypeParser = null;
}
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object stringObject = stringParser.parse(sourceData, rowIndex, context);
if (stringObject == null) {
return null;
}
String str = OperatorTools.parseString(stringObject);
if (str.isEmpty()) {
return "";
}

// parse compress type
String compressType = DEFAULT_COMPRESS_TYPE;
if (compressTypeParser != null) {
Object compressTypeObj = compressTypeParser.parse(sourceData, rowIndex, context);
if (compressTypeObj != null) {
compressType = OperatorTools.parseString(compressTypeObj);
}
}
compressType = compressType.toLowerCase();

try {
byte[] lengthBytes = intToLowByteArray(str.length());
CompressHandler handler = CompressionAlgorithmFactory.getCompressHandlerByName(compressType);
if (handler == null) {
throw new RuntimeException(compressType + " is not supported.");
}
byte[] compressBytes = handler.compress(str.getBytes(CHARSET));
return mergeByteArray(lengthBytes, compressBytes);
} catch (Exception e) {
log.error("Compression failed", e);
return null;
}
}

private String mergeByteArray(byte[] lengthBytes, byte[] dateBytes) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(
lengthBytes.length + dateBytes.length);
outputStream.write(lengthBytes);
outputStream.write(dateBytes);
return new String(outputStream.toByteArray(), CHARSET);
}

// low byte first
public static byte[] intToLowByteArray(int length) {
return new byte[]{
(byte) (length & 0xFF),
(byte) ((length >> 8) & 0xFF),
(byte) ((length >> 16) & 0xFF),
(byte) ((length >> 24) & 0xFF)
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.function.factory.UnCompressionAlgorithmFactory;
import org.apache.inlong.sdk.transform.process.function.handler.UncompressHandler;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;

/**
* UnCompressFunction
* description: uncompress(string_to_uncompress[,compress_type(default:deflater)])
* - Return NULL if string_to_uncompress is NULL.
* - Return "" if string_to_uncompress is "".
* - Return the result as a string.
*/
@Slf4j
@TransformFunction(names = {"uncompress"})
public class UnCompressFunction implements ValueParser {

private final ValueParser stringParser;
private final ValueParser uncompressTypeParser;

// ISO-8859-1 encoding does not convert negative numbers and can preserve the original values of byte arrays.
private final Charset CHARSET = StandardCharsets.ISO_8859_1;
private final String DEFAULT_UNCOMPRESS_TYPE = "deflater";

public UnCompressFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
stringParser = OperatorTools.buildParser(expressions.get(0));
if (expressions.size() == 2) {
uncompressTypeParser = OperatorTools.buildParser(expressions.get(1));
} else {
uncompressTypeParser = null;
}
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
// parse data
Object stringObject = stringParser.parse(sourceData, rowIndex, context);
if (stringObject == null) {
return null;
}
String str = OperatorTools.parseString(stringObject);
if (str.isEmpty()) {
return "";
} else if (str.startsWith("null")) {
return null;
}
// parse uncompress type
String uncompressType = DEFAULT_UNCOMPRESS_TYPE;
if (uncompressTypeParser != null) {
Object compressTypeObj = uncompressTypeParser.parse(sourceData, rowIndex, context);
if (compressTypeObj != null) {
uncompressType = OperatorTools.parseString(compressTypeObj);
}
}
uncompressType = uncompressType.toLowerCase();
// uncompress
try {
// The first four bytes are the data length
byte[] compressData = Arrays.copyOfRange(str.getBytes(CHARSET), 4, str.length());
UncompressHandler handler = UnCompressionAlgorithmFactory.getCompressHandlerByName(uncompressType);
if (handler == null) {
throw new RuntimeException(uncompressType + " is not supported.");
}
return new String(handler.uncompress(compressData));
} catch (Exception e) {
log.error("UnCompression failed", e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.factory;

import org.apache.inlong.sdk.transform.process.function.handler.CompressHandler;
import org.apache.inlong.sdk.transform.process.function.handler.DeflaterCompress;
import org.apache.inlong.sdk.transform.process.function.handler.GzipCompress;
import org.apache.inlong.sdk.transform.process.function.handler.ZipCompress;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CompressionAlgorithmFactory {

private static Map<String, CompressHandler> compressMap = new ConcurrentHashMap<>();

static {
compressMap.put("deflater", new DeflaterCompress());
compressMap.put("gzip", new GzipCompress());
compressMap.put("zip", new ZipCompress());
}

public static CompressHandler getCompressHandlerByName(String name) {
return compressMap.get(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.factory;

import org.apache.inlong.sdk.transform.process.function.handler.DeflaterUncompress;
import org.apache.inlong.sdk.transform.process.function.handler.GzipUncompress;
import org.apache.inlong.sdk.transform.process.function.handler.UncompressHandler;
import org.apache.inlong.sdk.transform.process.function.handler.ZipUncompress;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class UnCompressionAlgorithmFactory {

private static Map<String, UncompressHandler> unCompressMap = new ConcurrentHashMap<>();

static {
unCompressMap.put("deflater", new DeflaterUncompress());
unCompressMap.put("gzip", new GzipUncompress());
unCompressMap.put("zip", new ZipUncompress());
}

public static UncompressHandler getCompressHandlerByName(String name) {
return unCompressMap.get(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.handler;

public interface CompressHandler {

byte[] compress(byte[] data) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.handler;

import java.io.ByteArrayOutputStream;
import java.util.zip.Deflater;

public class DeflaterCompress implements CompressHandler {

@Override
public byte[] compress(byte[] data) throws Exception {
Deflater deflater = new Deflater();
deflater.setInput(data);
deflater.finish();

ByteArrayOutputStream outputStream = new ByteArrayOutputStream(data.length);
byte[] buffer = new byte[1024];
while (!deflater.finished()) {
int count = deflater.deflate(buffer);
outputStream.write(buffer, 0, count);
}
outputStream.close();
return outputStream.toByteArray();
}
}
Loading

0 comments on commit af9acab

Please sign in to comment.