Skip to content

Commit

Permalink
[INLONG-9247][Sort] TubeMQ source support audit when the deserialized…
Browse files Browse the repository at this point in the history
… type is not InlongMsg
  • Loading branch information
vernedeng committed Nov 10, 2023
1 parent 378c503 commit 5d6ae40
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public abstract class ExtractNode implements Node {

public static final String INLONG_MSG = "inlong-msg";

public static final String INLONG_MSG_AUDIT_TIME = "value.data-time";

public static final String CONSUME_AUDIT_TIME = "consume_time";

@JsonProperty("id")
private String id;
@JsonInclude(Include.NON_NULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.inlong.sort.protocol.constant.TubeMQConstant;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -129,10 +130,14 @@ public String getMetadataKey(MetaField metaField) {
String metadataKey;
switch (metaField) {
case AUDIT_DATA_TIME:
metadataKey = "value.data-time";
if (format instanceof InLongMsgFormat) {
metadataKey = INLONG_MSG_AUDIT_TIME;
} else {
metadataKey = CONSUME_AUDIT_TIME;
}
break;
default:
throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
throw new UnsupportedOperationException(String.format("Unsupported meta field for %s: %s",
this.getClass().getSimpleName(), metaField));
}
return metadataKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-common</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.tubemq;

import org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema;
import org.apache.inlong.sort.tubemq.table.TubeMQOptions;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
Expand Down Expand Up @@ -92,7 +93,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
/**
* The deserializer for records.
*/
private final DeserializationSchema<T> deserializationSchema;
private final DynamicTubeMQDeserializationSchema<T> deserializationSchema;

/**
* The random key for TubeMQ consumer group when startup.
Expand Down Expand Up @@ -158,7 +159,7 @@ public FlinkTubeMQConsumer(
String topic,
TreeSet<String> streamIdSet,
String consumerGroup,
DeserializationSchema<T> deserializationSchema,
DynamicTubeMQDeserializationSchema<T> deserializationSchema,
Configuration configuration,
String sessionKey,
Boolean innerFormat) {
Expand Down Expand Up @@ -208,7 +209,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
@Override
public void open(Configuration parameters) throws Exception {

deserializationSchema.open(null);
deserializationSchema.open();
ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup);
consumerConfig.setConsumePosition(consumeFromMax
? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
Expand Down Expand Up @@ -292,14 +293,14 @@ private Instant getRecords(Instant lastConsumeInstant, List<Message> messageList
lastConsumeInstant = Instant.now();
if (!innerFormat) {
for (Message message : messageList) {
T record = deserializationSchema.deserialize(message.getData());
T record = deserializationSchema.deserialize(message);
records.add(record);
}
} else {
List<RowData> rowDataList = new ArrayList<>();
ListCollector<RowData> out = new ListCollector<>(rowDataList);
for (Message message : messageList) {
deserializationSchema.deserialize(message.getData(), (Collector<T>) out);
deserializationSchema.deserialize(message, (Collector<T>) out);
}
rowDataList.forEach(data -> records.add((T) data));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,143 +17,44 @@

package org.apache.inlong.sort.tubemq.table;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.tubemq.corebase.Message;

import com.google.common.base.Objects;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.inlong.tubemq.corebase.Message;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class DynamicTubeMQDeserializationSchema implements DeserializationSchema<RowData> {

private static final Logger LOG = LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
/**
* data buffer message
*/
private final DeserializationSchema<RowData> deserializationSchema;
public interface DynamicTubeMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

/**
* {@link MetadataConverter} of how to produce metadata from message.
*/
private final MetadataConverter[] metadataConverters;
@PublicEvolving
default void open() throws Exception {}

/**
* {@link TypeInformation} of the produced {@link RowData} (physical + meta data).
* Deserializes the byte message.
*
* @param message The message, as a byte array.
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
private final TypeInformation<RowData> producedTypeInfo;
T deserialize(Message message) throws IOException;

/**
* status of error
* Deserializes the byte message.
*
* <p>Can output multiple records through the {@link Collector}. Note that number and size of
* the produced records should be relatively small. Depending on the source implementation
* records can be buffered in memory or collecting records might delay emitting checkpoint
* barrier.
*
* @param message The message, as a byte array.
* @param out The collector to put the resulting messages.
*/
private final boolean ignoreErrors;

private SourceMetricData sourceMetricData;

private MetricOption metricOption;

public DynamicTubeMQDeserializationSchema(
DeserializationSchema<RowData> schema,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean ignoreErrors,
MetricOption metricOption) {
this.deserializationSchema = schema;
this.metadataConverters = metadataConverters;
this.producedTypeInfo = producedTypeInfo;
this.ignoreErrors = ignoreErrors;
this.metricOption = metricOption;
}

@Override
public void open(InitializationContext context) {
if (metricOption != null) {
sourceMetricData = new SourceMetricData(metricOption);
@PublicEvolving
default void deserialize(Message message, Collector<T> out) throws IOException {
T deserialize = deserialize(message);
if (deserialize != null) {
out.collect(deserialize);
}
}

@Override
public RowData deserialize(byte[] bytes) throws IOException {
return deserializationSchema.deserialize(bytes);
}

@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
List<RowData> rows = new ArrayList<>();
deserializationSchema.deserialize(message,
new MetricsCollector<>(new ListCollector<>(rows), sourceMetricData));
rows.forEach(out::collect);
}

@Override
public boolean isEndOfStream(RowData rowData) {
return false;
}

@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DynamicTubeMQDeserializationSchema)) {
return false;
}
DynamicTubeMQDeserializationSchema that = (DynamicTubeMQDeserializationSchema) o;
return ignoreErrors == that.ignoreErrors
&& Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
&& Objects.equal(deserializationSchema, that.deserializationSchema)
&& Objects.equal(producedTypeInfo, that.producedTypeInfo);
}

@Override
public int hashCode() {
return Objects.hashCode(deserializationSchema, metadataConverters, producedTypeInfo, ignoreErrors);
}

/**
* add metadata column
*/
private void emitRow(Message head, GenericRowData physicalRow, Collector<RowData> out) {
if (metadataConverters.length == 0) {
out.collect(physicalRow);
return;
}
final int physicalArity = physicalRow.getArity();
final int metadataArity = metadataConverters.length;
final GenericRowData producedRow =
new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
}
for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
producedRow.setField(
physicalArity + metadataPos, metadataConverters[metadataPos].read(head));
}
out.collect(producedRow);
}

interface MetadataConverter extends Serializable {

Object read(Message head);
}
}
Loading

0 comments on commit 5d6ae40

Please sign in to comment.