Skip to content

Commit

Permalink
[INLONG-11410][DataProxy] Fixed the bug of excessive packet length wh…
Browse files Browse the repository at this point in the history
…en Go SDK receives DataProxy response (apache#11411)



Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Oct 25, 2024
1 parent 47f33c3 commit dda0897
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class StatConstants {
public static final java.lang.String EVENT_MSG_TXT_LEN_MALFORMED = "msg.txt.len.malformed";
public static final java.lang.String EVENT_MSG_ITEM_LEN_MALFORMED = "msg.item.len.malformed";
public static final java.lang.String EVENT_MSG_TYPE_5_LEN_MALFORMED = "msg.type5.len.malformed";
public static final java.lang.String EVENT_MSG_TYPE_5_CNT_UNEQUAL = "msg.type5.cnt.unequal";
public static final java.lang.String EVENT_MSG_ATTR_INVALID = "msg.attr.invalid";
public static final java.lang.String EVENT_MSG_ORDER_ACK_INVALID = "msg.attr.order.noack";
public static final java.lang.String EVENT_MSG_PROXY_ACK_INVALID = "msg.attr.proxy.noack";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,14 @@ private void procBinHeartbeatMsg(BaseSource source, Channel channel,
*/
public static ByteBuf buildBinMsgRspPackage(String attrs, long uniqVal) {
// calculate total length
// binTotalLen = mstType + uniq + attrsLen + attrs + magic
int binTotalLen = 1 + 4 + 2 + 2;
int attrsLen = 0;
byte[] byTeAttrs = null;
if (null != attrs) {
binTotalLen += attrs.length();
byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8);
attrsLen = byTeAttrs.length;
}
// binTotalLen = mstType + uniq + attrsLen + attrs + magic
int binTotalLen = 1 + 4 + 2 + attrsLen + 2;
// allocate buffer and write fields
ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
binBuffer.writeInt(binTotalLen);
Expand All @@ -573,11 +576,9 @@ public static ByteBuf buildBinMsgRspPackage(String attrs, long uniqVal) {
uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
uniq[3] = (byte) (uniqVal & 0xFF);
binBuffer.writeBytes(uniq);
if (null != attrs) {
binBuffer.writeShort(attrs.length());
binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
} else {
binBuffer.writeShort(0x0);
binBuffer.writeShort(attrsLen);
if (attrsLen > 0) {
binBuffer.writeBytes(byTeAttrs);
}
binBuffer.writeShort(0xee01);
return binBuffer;
Expand All @@ -593,8 +594,10 @@ public static ByteBuf buildBinMsgRspPackage(String attrs, long uniqVal) {
public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs) {
int attrsLen = 0;
int bodyLen = 0;
byte[] byTeAttrs = null;
if (attrs != null) {
attrsLen = attrs.length();
byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8);
attrsLen = byTeAttrs.length;
}
// backTotalLen = mstType + bodyLen + body + attrsLen + attrs
int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
Expand All @@ -604,7 +607,7 @@ public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs) {
buffer.writeInt(bodyLen);
buffer.writeInt(attrsLen);
if (attrsLen > 0) {
buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
buffer.writeBytes(byTeAttrs);
}
return buffer;
}
Expand All @@ -620,15 +623,20 @@ public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs) {
private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs, AbsV0MsgCodec msgObj) {
int attrsLen = 0;
int bodyLen = 0;
byte[] backBody = null;
byte[] backBody;
byte[] byTeAttrs = null;
if (attrs != null) {
attrsLen = attrs.length();
byTeAttrs = attrs.getBytes(StandardCharsets.UTF_8);
attrsLen = byTeAttrs.length;
}
if (MsgType.MSG_ORIGINAL_RETURN.equals(msgType)) {
backBody = msgObj.getOrigBody();
if (backBody != null) {
bodyLen = backBody.length;
}
} else {
backBody = new byte[]{50};
bodyLen = backBody.length;
}
// backTotalLen = mstType + bodyLen + body + attrsLen + attrs
int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
Expand All @@ -641,7 +649,7 @@ private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs, AbsV0MsgCod
}
buffer.writeInt(attrsLen);
if (attrsLen > 0) {
buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
buffer.writeBytes(byTeAttrs);
}
return buffer;
}
Expand All @@ -656,15 +664,16 @@ private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs, AbsV0MsgCod
*/
private ByteBuf buildHBRspPackage(byte[] attrData, byte version, int loadValue) {
// calculate total length
// binTotalLen = mstType + dataTime + body_ver + bodyLen + body + attrsLen + attrs + magic
int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + 2;
int attrsLen = 0;
if (null != attrData) {
binTotalLen += attrData.length;
attrsLen = attrData.length;
}
// check load value
if (loadValue == 0 || loadValue == (-1)) {
loadValue = 0xffff;
}
// binTotalLen = mstType + dataTime + version + bodyLen + body + attrsLen + attrs + magic
int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + attrsLen + 2;
// allocate buffer and write fields
ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
binBuffer.writeInt(binTotalLen);
Expand All @@ -673,11 +682,9 @@ private ByteBuf buildHBRspPackage(byte[] attrData, byte version, int loadValue)
binBuffer.writeByte(version);
binBuffer.writeInt(2);
binBuffer.writeShort(loadValue);
if (null != attrData) {
binBuffer.writeShort(attrData.length);
binBuffer.writeShort(attrsLen);
if (attrsLen > 0) {
binBuffer.writeBytes(attrData);
} else {
binBuffer.writeShort(0x0);
}
binBuffer.writeShort(0xee01);
return binBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,12 @@ public Event encEventPackage(BaseSource source, Channel channel) {
inLongMsg.addMsg(mapJoiner.join(attrMap), bodyBuffer);
calcCnt++;
}
attrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(calcCnt));
this.msgCount = calcCnt;
if (calcCnt != this.msgCount) {
this.msgCount = calcCnt;
source.fileMetricIncWithDetailStats(
StatConstants.EVENT_MSG_TYPE_5_CNT_UNEQUAL, groupId);
}
attrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(this.msgCount));
} else if (MsgType.MSG_MULTI_BODY_ATTR.equals(MsgType.valueOf(msgType))) {
attrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
inLongMsg.addMsg(mapJoiner.join(attrMap), bodyData);
Expand Down

0 comments on commit dda0897

Please sign in to comment.