diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDecodingFormat.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgDecodingFormat.java similarity index 93% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDecodingFormat.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgDecodingFormat.java index b87039b812b..a647b6f58e1 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDecodingFormat.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgDecodingFormat.java @@ -15,7 +15,9 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; + +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.connector.format.DecodingFormat; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgDeserializationSchema.java similarity index 97% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgDeserializationSchema.java index 4a404fe25db..24d730ac9d4 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgDeserializationSchema.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgWrap; import org.apache.inlong.sort.formats.metrics.FormatMetricGroup; import org.apache.flink.api.common.serialization.DeserializationSchema; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgFormatDeserializer.java similarity index 93% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgFormatDeserializer.java index 9f631851021..a07b7fbc2b7 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgFormatDeserializer.java @@ -15,9 +15,14 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; import org.apache.inlong.common.msg.InLongMsg; +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.IgnoreFailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgWrap; import org.apache.inlong.sort.formats.metrics.FormatMetricGroup; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgMixedFormatConverter.java similarity index 98% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgMixedFormatConverter.java index 87168cb15b4..800d22e4695 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgMixedFormatConverter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.table.data.RowData; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgMixedFormatDeserializer.java similarity index 91% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgMixedFormatDeserializer.java index 11e46c75cbd..9c689fc21d6 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/AbstractInLongMsgMixedFormatDeserializer.java @@ -15,7 +15,9 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; + +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; import javax.annotation.Nonnull; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatConverter.java similarity index 95% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatConverter.java index de9774fadac..bffc84d87fa 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatConverter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterBuilder.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatConverterBuilder.java similarity index 92% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterBuilder.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatConverterBuilder.java index 6067688902c..b91f0f4fbeb 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterBuilder.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatConverterBuilder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo; @@ -27,8 +27,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS; import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL; import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; /** * The builder for {@link AbstractInLongMsgMixedFormatConverter}s. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterValidator.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatConverterValidator.java similarity index 90% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterValidator.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatConverterValidator.java index d793dff7fcd..826616a9ff6 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterValidator.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatConverterValidator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; import org.apache.inlong.sort.formats.base.FormatDescriptorValidator; @@ -23,7 +23,7 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS; import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.validateInLongMsgSchema; /** * The validator for the properties of {@link AbstractInLongMsgMixedFormatConverter}s. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatDeserializerValidator.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatDeserializerValidator.java similarity index 97% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatDeserializerValidator.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatDeserializerValidator.java index d6e4a14774e..bd445f8c598 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatDeserializerValidator.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgMixedFormatDeserializerValidator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; import org.apache.inlong.sort.formats.base.FormatDescriptorValidator; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgTextMixedFormatDeserializerBuilder.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgTextMixedFormatDeserializerBuilder.java similarity index 98% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgTextMixedFormatDeserializerBuilder.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgTextMixedFormatDeserializerBuilder.java index 10f74065e07..51d6c9a52d8 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgTextMixedFormatDeserializerBuilder.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgTextMixedFormatDeserializerBuilder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; import org.apache.flink.table.descriptors.DescriptorProperties; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgUtils.java similarity index 96% rename from inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java rename to inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgUtils.java index f48c1c7a0f2..0dcf7544520 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/rowdata/InLongMsgUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.formats.inlongmsg; +package org.apache.inlong.sort.formats.inlongmsg.rowdata; import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo; import org.apache.inlong.sort.formats.base.FieldToRowDataConverters; @@ -33,6 +33,11 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.VarCharType; +import org.apache.inlong.sort.formats.inlongmsg.IgnoreFailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata; +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.NoOpFailureHandler; import javax.annotation.Nullable; @@ -212,15 +217,15 @@ public static TypeInformation decorateRowTypeWithMetadata( // Metadata final List readableMetadata = metadataKeys.stream() .map(k -> Stream.of(InLongMsgMetadata.ReadableMetadata.values()) - .filter(rm -> rm.key.equals(k)) + .filter(rm -> rm.getKey().equals(k)) .findFirst() .orElseThrow(IllegalStateException::new)) .collect(Collectors.toList()); for (InLongMsgMetadata.ReadableMetadata rm : readableMetadata) { // TODO : avoid name conflict? - fieldNames.add(rm.key); - fieldTypes.add(rm.dataType.getLogicalType()); + fieldNames.add(rm.getKey()); + fieldTypes.add(rm.getDataType().getLogicalType()); } RowType rowType = RowType.of(fieldTypes.toArray(new LogicalType[0]), fieldNames.toArray(new String[0])); @@ -319,15 +324,15 @@ public static TypeInformation decorateRowTypeWithNeededHeadFieldsAndMet // Metadata final List readableMetadata = metadataKeys.stream() .map(k -> Stream.of(InLongMsgMetadata.ReadableMetadata.values()) - .filter(rm -> rm.key.equals(k)) + .filter(rm -> rm.getKey().equals(k)) .findFirst() .orElseThrow(IllegalStateException::new)) .collect(Collectors.toList()); for (InLongMsgMetadata.ReadableMetadata rm : readableMetadata) { // TODO : avoid name conflict? - fieldNames.add(rm.key); - fieldTypes.add(rm.dataType.getLogicalType()); + fieldNames.add(rm.getKey()); + fieldTypes.add(rm.getDataType().getLogicalType()); } RowType rowType = RowType.of(fieldTypes.toArray(new LogicalType[0]), fieldNames.toArray(new String[0])); diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogDecodingFormat.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogDecodingFormat.java index 0890cb99c0f..c6894cccb3f 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogDecodingFormat.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogDecodingFormat.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.formats.inlongmsgbinlog; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDecodingFormat; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgDecodingFormat; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.ReadableConfig; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java index 53a0ba64f4c..3509a34cfb9 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java @@ -18,11 +18,11 @@ package org.apache.inlong.sort.formats.inlongmsgbinlog; import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgFormatDeserializer; import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; -import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.RowData; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java index 63cb4bec38b..982da09faef 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java @@ -18,15 +18,15 @@ package org.apache.inlong.sort.formats.inlongmsgbinlog; import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDeserializationSchema; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgDeserializationSchema; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgFormatDeserializer; import org.apache.flink.annotation.PublicEvolving; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_INCLUDE_UPDATE_BEFORE; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_METADATA_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; /** * Deserialization schema from inlong-msg-binlog to Flink Table & SQL internal data structures. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java index 16ed2a7056f..ca5b4f04604 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java @@ -43,13 +43,13 @@ import java.util.Map; import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.getPredefinedFields; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseAttr; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseEpochTime; /** * Utilities for InLongMsgBinlog. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java index 7898367647a..da9b5b78755 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.formats.inlongmsgcsv; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDecodingFormat; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgDecodingFormat; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.flink.api.common.serialization.DeserializationSchema; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java index 3e9f0843f5c..a637a5df00c 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java @@ -22,11 +22,11 @@ import org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter; import org.apache.inlong.sort.formats.base.TableFormatUtils; import org.apache.inlong.sort.formats.base.TextFormatBuilder; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgFormatDeserializer; import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; -import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.GenericRowData; @@ -43,8 +43,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_RETAIN_PREDEFINED_FIELD; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; import static org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER; /** diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java index d90cedb28df..0ecee5f4231 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java @@ -18,8 +18,8 @@ package org.apache.inlong.sort.formats.inlongmsgcsv; import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDeserializationSchema; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgDeserializationSchema; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgFormatDeserializer; import org.apache.flink.annotation.PublicEvolving; @@ -32,8 +32,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; /** * Deserialization schema from InLongMsg-CSV to Flink Table & SQL internal data structures. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java index a7f0fd1905e..dad5198d11e 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java @@ -37,16 +37,16 @@ import java.util.stream.Collectors; import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.getPredefinedFields; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseAttr; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseDateTime; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseEpochTime; import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv; /** diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java index fb8e00ab419..370497cc862 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java @@ -59,8 +59,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER; import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata.ReadableMetadata.STREAMID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; import static org.junit.Assert.assertEquals; /** diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvDecodingFormat.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvDecodingFormat.java index 0e063482cea..3031b0ce745 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvDecodingFormat.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvDecodingFormat.java @@ -17,7 +17,7 @@ package org.apache.inlong.sort.formats.inlongmsgkv; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDecodingFormat; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgDecodingFormat; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.flink.api.common.serialization.DeserializationSchema; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java index b3dff0dec8f..770822aceb3 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java @@ -22,11 +22,11 @@ import org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter; import org.apache.inlong.sort.formats.base.TableFormatUtils; import org.apache.inlong.sort.formats.base.TextFormatBuilder; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgFormatDeserializer; import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; -import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.GenericRowData; @@ -43,8 +43,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; /** * The deserializer for the records in InLongMsgKv format. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvRowDataDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvRowDataDeserializationSchema.java index e2fbc293368..8a9564d0821 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvRowDataDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvRowDataDeserializationSchema.java @@ -18,8 +18,8 @@ package org.apache.inlong.sort.formats.inlongmsgkv; import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDeserializationSchema; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgDeserializationSchema; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgFormatDeserializer; import org.apache.flink.annotation.PublicEvolving; @@ -31,8 +31,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_RETAIN_PREDEFINED_FIELD; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; /** * Deserialization schema from InLongMsg-KV to Flink Table & SQL internal data structures. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java index 8bcf77c1ecb..4536adcdca0 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java @@ -33,15 +33,15 @@ import java.util.stream.Collectors; import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.getPredefinedFields; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseAttr; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseDateTime; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseEpochTime; import static org.apache.inlong.sort.formats.util.StringUtils.splitKv; /** diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvDeserializationSchema.java index 6d29d839eeb..62a87f7be31 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvDeserializationSchema.java @@ -18,8 +18,8 @@ package org.apache.inlong.sort.formats.inlongmsgtlogcsv; import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDeserializationSchema; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgDeserializationSchema; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgFormatDeserializer; import org.apache.flink.annotation.PublicEvolving; @@ -31,8 +31,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; /** * Deserialization schema from InLongMsg TlogCsv to Flink Table & SQL internal data structures. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java index 814466f079a..111d69ca03f 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java @@ -22,11 +22,11 @@ import org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter; import org.apache.inlong.sort.formats.base.TableFormatUtils; import org.apache.inlong.sort.formats.base.TextFormatBuilder; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgFormatDeserializer; import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; -import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.GenericRowData; @@ -41,8 +41,8 @@ import java.util.Objects; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; /** * The deserializer for the records in InLongMsgTlogCsv format. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java index a216cf3429c..0a8c158843b 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java @@ -36,12 +36,12 @@ import java.util.stream.Collectors; import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.getPredefinedFields; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseAttr; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseDateTime; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseEpochTime; import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv; /** diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java index 7e83816b4d2..ee25bb10ac7 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java @@ -54,8 +54,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER; import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata.ReadableMetadata.STREAMID; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; import static org.junit.Assert.assertEquals; /** diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java index dee2a5b7917..b9bc4c67c56 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java @@ -22,11 +22,11 @@ import org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter; import org.apache.inlong.sort.formats.base.TableFormatUtils; import org.apache.inlong.sort.formats.base.TextFormatBuilder; -import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.AbstractInLongMsgFormatDeserializer; import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; -import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; +import org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.GenericRowData; @@ -50,8 +50,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; import static org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET; /** diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java index 7e5dc6f864b..7c0d4a21d37 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java @@ -32,12 +32,12 @@ import java.util.Map; import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.getPredefinedFields; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseAttr; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseDateTime; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.parseEpochTime; import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv; import static org.apache.inlong.sort.formats.util.StringUtils.splitKv; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java index 8b163a4bb2d..02d30d00752 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java @@ -24,7 +24,7 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.validateInLongMsgSchema; /** * The validator for {@link InLongMsgTlogKv}. diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java index f8b4c9c5191..06d78c7d04e 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java @@ -47,8 +47,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER; import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; -import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.rowdata.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; import static org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET; import static org.junit.Assert.assertEquals;