diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java index 01db0527e9b..dc39f6c5a7b 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java @@ -77,6 +77,7 @@ public class StatConstants { public static final java.lang.String EVENT_MSG_TXT_LEN_MALFORMED = "msg.txt.len.malformed"; public static final java.lang.String EVENT_MSG_ITEM_LEN_MALFORMED = "msg.item.len.malformed"; public static final java.lang.String EVENT_MSG_TYPE_5_LEN_MALFORMED = "msg.type5.len.malformed"; + public static final java.lang.String EVENT_MSG_TYPE_5_CNT_UNEQUAL = "msg.type5.cnt.unequal"; public static final java.lang.String EVENT_MSG_ATTR_INVALID = "msg.attr.invalid"; public static final java.lang.String EVENT_MSG_ORDER_ACK_INVALID = "msg.attr.order.noack"; public static final java.lang.String EVENT_MSG_PROXY_ACK_INVALID = "msg.attr.proxy.noack"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index fcba591e651..31178909227 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -558,11 +558,14 @@ private void procBinHeartbeatMsg(BaseSource source, Channel channel, */ public static ByteBuf buildBinMsgRspPackage(String attrs, long uniqVal) { // calculate total length - // binTotalLen = mstType + uniq + attrsLen + attrs + magic - int binTotalLen = 1 + 4 + 2 + 2; + int attrsLen = 0; + byte[] byTeAttrs = null; if (null != attrs) { - binTotalLen += attrs.length(); + byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8); + attrsLen = byTeAttrs.length; } + // binTotalLen = mstType + uniq + attrsLen + attrs + magic + int binTotalLen = 1 + 4 + 2 + attrsLen + 2; // allocate buffer and write fields ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen); binBuffer.writeInt(binTotalLen); @@ -573,11 +576,9 @@ public static ByteBuf buildBinMsgRspPackage(String attrs, long uniqVal) { uniq[2] = (byte) ((uniqVal >> 8) & 0xFF); uniq[3] = (byte) (uniqVal & 0xFF); binBuffer.writeBytes(uniq); - if (null != attrs) { - binBuffer.writeShort(attrs.length()); - binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8)); - } else { - binBuffer.writeShort(0x0); + binBuffer.writeShort(attrsLen); + if (attrsLen > 0) { + binBuffer.writeBytes(byTeAttrs); } binBuffer.writeShort(0xee01); return binBuffer; @@ -593,8 +594,10 @@ public static ByteBuf buildBinMsgRspPackage(String attrs, long uniqVal) { public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs) { int attrsLen = 0; int bodyLen = 0; + byte[] byTeAttrs = null; if (attrs != null) { - attrsLen = attrs.length(); + byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8); + attrsLen = byTeAttrs.length; } // backTotalLen = mstType + bodyLen + body + attrsLen + attrs int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen; @@ -604,7 +607,7 @@ public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs) { buffer.writeInt(bodyLen); buffer.writeInt(attrsLen); if (attrsLen > 0) { - buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8)); + buffer.writeBytes(byTeAttrs); } return buffer; } @@ -620,15 +623,20 @@ public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs) { private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs, AbsV0MsgCodec msgObj) { int attrsLen = 0; int bodyLen = 0; - byte[] backBody = null; + byte[] backBody; + byte[] byTeAttrs = null; if (attrs != null) { - attrsLen = attrs.length(); + byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8); + attrsLen = byTeAttrs.length; } if (MsgType.MSG_ORIGINAL_RETURN.equals(msgType)) { backBody = msgObj.getOrigBody(); if (backBody != null) { bodyLen = backBody.length; } + } else { + backBody = new byte[]{50}; + bodyLen = backBody.length; } // backTotalLen = mstType + bodyLen + body + attrsLen + attrs int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen; @@ -641,7 +649,7 @@ private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs, AbsV0MsgCod } buffer.writeInt(attrsLen); if (attrsLen > 0) { - buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8)); + buffer.writeBytes(byTeAttrs); } return buffer; } @@ -656,15 +664,16 @@ private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs, AbsV0MsgCod */ private ByteBuf buildHBRspPackage(byte[] attrData, byte version, int loadValue) { // calculate total length - // binTotalLen = mstType + dataTime + body_ver + bodyLen + body + attrsLen + attrs + magic - int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + 2; + int attrsLen = 0; if (null != attrData) { - binTotalLen += attrData.length; + attrsLen = attrData.length; } // check load value if (loadValue == 0 || loadValue == (-1)) { loadValue = 0xffff; } + // binTotalLen = mstType + dataTime + version + bodyLen + body + attrsLen + attrs + magic + int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + attrsLen + 2; // allocate buffer and write fields ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen); binBuffer.writeInt(binTotalLen); @@ -673,11 +682,9 @@ private ByteBuf buildHBRspPackage(byte[] attrData, byte version, int loadValue) binBuffer.writeByte(version); binBuffer.writeInt(2); binBuffer.writeShort(loadValue); - if (null != attrData) { - binBuffer.writeShort(attrData.length); + binBuffer.writeShort(attrsLen); + if (attrsLen > 0) { binBuffer.writeBytes(attrData); - } else { - binBuffer.writeShort(0x0); } binBuffer.writeShort(0xee01); return binBuffer; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java index 746cac9ca08..68fb139957c 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java @@ -259,8 +259,12 @@ public Event encEventPackage(BaseSource source, Channel channel) { inLongMsg.addMsg(mapJoiner.join(attrMap), bodyBuffer); calcCnt++; } - attrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(calcCnt)); - this.msgCount = calcCnt; + if (calcCnt != this.msgCount) { + this.msgCount = calcCnt; + source.fileMetricIncWithDetailStats( + StatConstants.EVENT_MSG_TYPE_5_CNT_UNEQUAL, groupId); + } + attrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(this.msgCount)); } else if (MsgType.MSG_MULTI_BODY_ATTR.equals(MsgType.valueOf(msgType))) { attrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1)); inLongMsg.addMsg(mapJoiner.join(attrMap), bodyData);