Skip to content

Commit

Permalink
[INLONG-10729][Sort] Sorstandalone EsSink support transform
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Jul 29, 2024
1 parent ab543f8 commit 96f3bbe
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.inlong.sdk.transform.pojo.EsMapSinkInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;

import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
public class EsMapSinkEncoder implements SinkEncoder<Map<String, Object>> {

private final EsMapSinkInfo sinkInfo;
private final Map<String, TypeConverter> converters;

public EsMapSinkEncoder(EsMapSinkInfo sinkInfo) {
this.sinkInfo = sinkInfo;
this.converters = sinkInfo.getFields()
.stream()
.collect(Collectors.toMap(FieldInfo::getName, FieldInfo::getConverter));
}

@Override
public Map<String, Object> encode(SinkData sinkData, Context context) {
Map<String, Object> esMap = new HashMap<>();
for (FieldInfo fieldInfo : sinkInfo.getFields()) {
String fieldName = fieldInfo.getName();
String strValue = sinkData.getField(fieldName);
TypeConverter converter = converters.get(fieldName);
try {
esMap.put(fieldName, converter.convert(strValue));
} catch (Throwable t) {
log.warn("failed to serialize field ={}, value={}", fieldName, strValue, t);
esMap.put(fieldName, null);
}
}

return esMap;
}

@Override
public List<FieldInfo> getFields() {
return sinkInfo.getFields();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.encode;

import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.EsMapSinkInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;

public class SinkEncoderFactory {
Expand All @@ -29,4 +30,8 @@ public static CsvSinkEncoder createCsvEncoder(CsvSinkInfo csvSinkInfo) {
public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) {
return new KvSinkEncoder(kvSinkInfo);
}

public static EsMapSinkEncoder createEsMapEncoder(EsMapSinkInfo esMapSinkInfo) {
return new EsMapSinkEncoder(esMapSinkInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public CsvSinkInfo(
@JsonProperty("delimiter") Character delimiter,
@JsonProperty("escapeChar") Character escapeChar,
@JsonProperty("fields") List<FieldInfo> fields) {
super(SourceInfo.CSV, charset);
super(SinkInfo.CSV, charset);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
if (fields != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.pojo;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.experimental.SuperBuilder;
import org.apache.commons.collections.CollectionUtils;

import java.util.List;

@JsonIgnoreProperties(ignoreUnknown = true)
@SuperBuilder
@Data
public class EsMapSinkInfo extends SinkInfo {

private List<FieldInfo> fields;

public EsMapSinkInfo(
@JsonProperty("charset") String charset,
@JsonProperty("fields") List<FieldInfo> fields) {
super(SinkInfo.ES_MAP, charset);
if (CollectionUtils.isEmpty(fields)) {
throw new IllegalArgumentException("failed to init es map sink info, fieldInfos is empty");
}
this.fields = fields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sdk.transform.pojo;

import org.apache.inlong.sdk.transform.process.converter.TypeConverter;

import lombok.Data;

/**
Expand All @@ -26,4 +28,14 @@
public class FieldInfo {

private String name;
private TypeConverter converter;

public FieldInfo() {

}

public FieldInfo(String name, TypeConverter converter) {
this.name = name;
this.converter = converter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class KvSinkInfo extends SinkInfo {
public KvSinkInfo(
@JsonProperty("charset") String charset,
@JsonProperty("fields") List<FieldInfo> fields) {
super(SourceInfo.KV, charset);
super(SinkInfo.KV, charset);
if (fields != null) {
this.fields = fields;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import lombok.experimental.SuperBuilder;

import java.util.Optional;
Expand All @@ -33,12 +34,17 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes({
@Type(value = CsvSinkInfo.class, name = SourceInfo.CSV),
@Type(value = KvSinkInfo.class, name = SourceInfo.KV),
@Type(value = CsvSinkInfo.class, name = SinkInfo.CSV),
@Type(value = KvSinkInfo.class, name = SinkInfo.KV),
})
@SuperBuilder
@Data
public abstract class SinkInfo {

public static final String CSV = "csv";
public static final String KV = "kv";
public static final String ES_MAP = "es_map";

@JsonIgnore
private String type;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.converter;

public interface TypeConverter {

Object convert(String value) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sdk.transform.decode.SourceDecoder;
import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
Expand All @@ -42,7 +44,7 @@
import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;

import com.google.common.collect.Maps;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
Expand All @@ -69,7 +71,6 @@ public class SinkContext {
protected final String sinkName;
protected final Context sinkContext;
protected TaskConfig taskConfig;
protected Map<String, TransformProcessor<String, String>> transformMap;
@Deprecated
protected SortTaskConfig sortTaskConfig;
protected final Channel channel;
Expand All @@ -91,7 +92,6 @@ public SinkContext(String sinkName, Context context, Channel channel) {
this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
this.metricItemSet = new SortMetricItemSet(sinkName);
this.unifiedConfiguration = CommonPropertiesHolder.useUnifiedConfiguration();
this.transformMap = Maps.newConcurrentMap();
MetricRegister.register(this.metricItemSet);
}

Expand Down Expand Up @@ -197,7 +197,14 @@ public static <U> BufferQueue<U> createBufferQueue() {
}

public TransformConfig createTransformConfig(DataFlowConfig dataFlowConfig) {
return new TransformConfig(dataFlowConfig.getTransformSql());
return new TransformConfig(dataFlowConfig.getTransformSql(), globalConfiguration());
}

public Map<String, Object> globalConfiguration() {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.putAll(CommonPropertiesHolder.get());
builder.putAll(sinkContext.getParameters());
return builder.build();
}

public SourceDecoder<String> createSourceDecoder(SourceConfig sourceConfig) {
Expand Down Expand Up @@ -234,8 +241,14 @@ public SourceDecoder<String> createSourceDecoder(SourceConfig sourceConfig) {
}

public FieldInfo convertToTransformFieldInfo(FieldConfig config) {
FieldInfo fieldInfo = new FieldInfo();
fieldInfo.setName(config.getName());
return fieldInfo;
return new FieldInfo(config.getName(), deriveTypeConverter(config.getFormatInfo()));
}

public TypeConverter deriveTypeConverter(FormatInfo formatInfo) {

if (formatInfo instanceof BasicFormatInfo) {
return value -> ((BasicFormatInfo<?>) formatInfo).deserialize(value);
}
return value -> value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;

import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.UnescapeHelper;

import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/**
*
* DefaultEvent2IndexRequestHandler
*/
@Slf4j
public class DefaultEvent2IndexRequestHandler implements IEvent2IndexRequestHandler {

public static final String KEY_EXTINFO = "extinfo";
Expand Down Expand Up @@ -105,6 +110,32 @@ public EsIndexRequest parse(EsSinkContext context, ProfileEvent event) {
return indexRequest;
}

@Override
public List<EsIndexRequest> parse(
EsSinkContext context,
ProfileEvent event,
TransformProcessor<String, Map<String, Object>> processor) {
if (processor == null) {
log.error("find no any transform processor for es sink");
return null;
}

String uid = event.getUid();
EsIdConfig idConfig = context.getIdConfig(uid);
String indexName = idConfig.parseIndexName(event.getRawLogTime());
byte[] bodyBytes = event.getBody();
String strContext = new String(bodyBytes, Charset.defaultCharset());
// build
List<Map<String, Object>> esData = processor.transform(strContext);
return esData.stream()
.map(data -> {
EsIndexRequest indexRequest = new EsIndexRequest(indexName, event);
indexRequest.source(data);
return indexRequest;
})
.collect(Collectors.toList());
}

/**
* getExtInfo
*
Expand Down
Loading

0 comments on commit 96f3bbe

Please sign in to comment.