diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java index a606ad14e87..1fc096805bd 100644 --- a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java @@ -97,6 +97,9 @@ public final class InLongMsgTlogCsvFormatDeserializer extends AbstractInLongMsgF @Nullable private final String nullLiteral; + @Nonnull + private Boolean isIncludeFirstSegment = false; + public InLongMsgTlogCsvFormatDeserializer( @Nonnull RowFormatInfo rowFormatInfo, @Nullable String timeFieldName, @@ -116,6 +119,7 @@ public InLongMsgTlogCsvFormatDeserializer( escapeChar, quoteChar, nullLiteral, + false, InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); } @@ -128,6 +132,7 @@ public InLongMsgTlogCsvFormatDeserializer( @Nullable Character escapeChar, @Nullable Character quoteChar, @Nullable String nullLiteral, + @Nullable boolean isIncludeFirstSegment, @Nonnull FailureHandler failureHandler) { super(failureHandler); @@ -139,6 +144,7 @@ public InLongMsgTlogCsvFormatDeserializer( this.escapeChar = escapeChar; this.quoteChar = quoteChar; this.nullLiteral = nullLiteral; + this.isIncludeFirstSegment = isIncludeFirstSegment; } @Override @@ -154,7 +160,8 @@ protected InLongMsgHead parseHead(String attr) throws Exception { @Override protected List parseBodyList(byte[] bytes) throws Exception { return Collections.singletonList( - InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, quoteChar)); + InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, + quoteChar, isIncludeFirstSegment)); } @Override @@ -183,7 +190,7 @@ public static class Builder extends TextFormatBuilder { private String timeFieldName = DEFAULT_TIME_FIELD_NAME; private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME; private Character delimiter = DEFAULT_DELIMITER; - + private boolean isIncludeFirstSegment = true; public Builder(RowFormatInfo rowFormatInfo) { super(rowFormatInfo); } @@ -226,7 +233,8 @@ public InLongMsgTlogCsvFormatDeserializer build() { escapeChar, quoteChar, nullLiteral, - ignoreErrors); + isIncludeFirstSegment, + InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); } } @@ -252,13 +260,14 @@ public boolean equals(Object o) { delimiter.equals(that.delimiter) && Objects.equals(escapeChar, that.escapeChar) && Objects.equals(quoteChar, that.quoteChar) && - Objects.equals(nullLiteral, that.nullLiteral); + Objects.equals(nullLiteral, that.nullLiteral) && + Objects.equals(isIncludeFirstSegment, that.isIncludeFirstSegment); } @Override public int hashCode() { return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName, attributesFieldName, charset, delimiter, escapeChar, quoteChar, - nullLiteral); + nullLiteral, isIncludeFirstSegment); } } diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java index 46723f058c0..11f86d204e4 100644 --- a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java @@ -69,13 +69,17 @@ public final class InLongMsgTlogCsvMixedFormatDeserializer @Nullable private final Character quoteChar; + @Nonnull + private Boolean isIncludeFirstSegment = false; + public InLongMsgTlogCsvMixedFormatDeserializer( @Nonnull String charset, @Nonnull Character delimiter, @Nullable Character escapeChar, @Nullable Character quoteChar, @Nonnull Boolean ignoreErrors) { - this(charset, delimiter, escapeChar, quoteChar, InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); + this(charset, delimiter, escapeChar, quoteChar, false, + InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); } public InLongMsgTlogCsvMixedFormatDeserializer( @@ -83,6 +87,7 @@ public InLongMsgTlogCsvMixedFormatDeserializer( @Nonnull Character delimiter, @Nullable Character escapeChar, @Nullable Character quoteChar, + @Nonnull Boolean isIncludeFirstSegment, @Nonnull FailureHandler failureHandler) { super(failureHandler); @@ -90,6 +95,7 @@ public InLongMsgTlogCsvMixedFormatDeserializer( this.charset = charset; this.escapeChar = escapeChar; this.quoteChar = quoteChar; + this.isIncludeFirstSegment = isIncludeFirstSegment; } @Override @@ -105,7 +111,8 @@ protected InLongMsgHead parseHead(String attr) throws Exception { @Override protected List parseBodyList(byte[] bytes) throws Exception { return Collections.singletonList( - InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, quoteChar)); + InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, + quoteChar, isIncludeFirstSegment)); } @Override diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java index 79784249e78..9e0e35952b5 100644 --- a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java @@ -79,7 +79,8 @@ public static InLongMsgBody parseBody( String charset, char delimiter, Character escapeChar, - Character quoteChar) { + Character quoteChar, + boolean isIncludeFirstSegment) { String text; if (bytes[0] == delimiter) { text = new String(bytes, 1, bytes.length - 1, Charset.forName(charset)); @@ -91,7 +92,7 @@ public static InLongMsgBody parseBody( String streamId = segments[0]; List fields = - Arrays.stream(segments, 1, segments.length).collect(Collectors.toList()); + Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1), segments.length).collect(Collectors.toList()); return new InLongMsgBody(bytes, streamId, fields, Collections.emptyMap()); } diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java index f24e7887874..e24932fb3b6 100644 --- a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java @@ -52,10 +52,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest { private static final RowFormatInfo TEST_ROW_INFO = new RowFormatInfo( - new String[]{"f1", "f2", "f3", "f4", "f5"}, + new String[]{"__addcol1_", "__addcol2_", "f1", "f2", "f3", "f4"}, new FormatInfo[]{ IntFormatInfo.INSTANCE, - IntFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE @@ -74,6 +75,7 @@ public void testExceptionHandler() throws Exception { null, null, null, + true, errorHandler); InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true); diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java index 02bca5450ea..f48c1c7a0f2 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java @@ -73,8 +73,8 @@ public class InLongMsgUtils { public static final String INLONGMSG_ATTR_TIME_DT = "dt"; public static final String INLONGMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol"; - public static final String DEFAULT_TIME_FIELD_NAME = null; - public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = null; + public static final String DEFAULT_TIME_FIELD_NAME = "inlongmsg_time"; + public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = "inlongmsg_attributes"; private static final FieldToRowDataConverters.FieldToRowDataConverter TIME_FIELD_CONVERTER = FieldToRowDataConverters.createConverter(new TimestampType()); diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java index c0cdc68b30f..814466f079a 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java @@ -92,6 +92,8 @@ public final class InLongMsgTlogCsvFormatDeserializer extends AbstractInLongMsgF @Nullable private final Character quoteChar; + @Nonnull + private Boolean isIncludeFirstSegment = false; /** * The literal represented null values, default "". */ @@ -123,6 +125,7 @@ public InLongMsgTlogCsvFormatDeserializer( quoteChar, nullLiteral, metadataKeys, + false, InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); } @@ -136,6 +139,7 @@ public InLongMsgTlogCsvFormatDeserializer( @Nullable Character quoteChar, @Nullable String nullLiteral, List metadataKeys, + @Nonnull Boolean isIncludeFirstSegment, @Nonnull FailureHandler failureHandler) { super(failureHandler); @@ -148,7 +152,7 @@ public InLongMsgTlogCsvFormatDeserializer( this.quoteChar = quoteChar; this.nullLiteral = nullLiteral; this.metadataKeys = metadataKeys; - + this.isIncludeFirstSegment = isIncludeFirstSegment; converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos()) .map(formatInfo -> FieldToRowDataConverters.createConverter( TableFormatUtils.deriveLogicalType(formatInfo))) @@ -172,7 +176,8 @@ protected InLongMsgHead parseHead(String attr) throws Exception { @Override protected List parseBodyList(byte[] bytes) throws Exception { return Collections.singletonList( - InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, quoteChar)); + InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, + quoteChar, isIncludeFirstSegment)); } @Override @@ -204,6 +209,7 @@ public static class Builder extends TextFormatBuilder { private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME; private Character delimiter = DEFAULT_DELIMITER; private List metadataKeys = Collections.emptyList(); + private boolean isIncludeFirstSegment = false; public Builder(RowFormatInfo rowFormatInfo) { super(rowFormatInfo); @@ -229,6 +235,11 @@ public Builder setMetadataKeys(List metadataKeys) { return this; } + public Builder setIncludeFirstSegment(boolean isIncludeFirstSegment) { + this.isIncludeFirstSegment = isIncludeFirstSegment; + return this; + } + public InLongMsgTlogCsvFormatDeserializer build() { return new InLongMsgTlogCsvFormatDeserializer( rowFormatInfo, @@ -240,7 +251,8 @@ public InLongMsgTlogCsvFormatDeserializer build() { quoteChar, nullLiteral, metadataKeys, - ignoreErrors); + isIncludeFirstSegment, + InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); } } @@ -267,13 +279,14 @@ public boolean equals(Object o) { Objects.equals(escapeChar, that.escapeChar) && Objects.equals(quoteChar, that.quoteChar) && Objects.equals(nullLiteral, that.nullLiteral) && - Objects.equals(metadataKeys, that.metadataKeys); + Objects.equals(metadataKeys, that.metadataKeys) && + Objects.equals(isIncludeFirstSegment, that.isIncludeFirstSegment); } @Override public int hashCode() { return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName, attributesFieldName, charset, delimiter, escapeChar, quoteChar, - nullLiteral, metadataKeys); + nullLiteral, metadataKeys, isIncludeFirstSegment); } } diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java index 8523ff544d0..a216cf3429c 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java @@ -80,7 +80,8 @@ public static InLongMsgBody parseBody( String charset, char delimiter, Character escapeChar, - Character quoteChar) { + Character quoteChar, + boolean isIncludeFirstSegment) { String text; if (bytes[0] == delimiter) { text = new String(bytes, 1, bytes.length - 1, Charset.forName(charset)); @@ -92,7 +93,7 @@ public static InLongMsgBody parseBody( String tid = segments[0]; List fields = - Arrays.stream(segments, 1, segments.length).collect(Collectors.toList()); + Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1), segments.length).collect(Collectors.toList()); return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap()); } diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java index 8d44ae4dd68..efc9e886e32 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java @@ -68,10 +68,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest { private static final RowFormatInfo TEST_ROW_INFO = new RowFormatInfo( - new String[]{"f1", "f2", "f3", "f4", "f5"}, + new String[]{"__addcol1_", "__addcol2_", "f1", "f2", "f3", "f4"}, new FormatInfo[]{ IntFormatInfo.INSTANCE, - IntFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE @@ -91,6 +92,7 @@ public void testExceptionHandler() throws Exception { null, null, Collections.emptyList(), + false, errorHandler); InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);