From 95ac2e0663b88f873d44fba93cc171aefd06c4ea Mon Sep 17 00:00:00 2001 From: mxsm Date: Mon, 1 Jan 2024 11:18:17 +0800 Subject: [PATCH 1/7] [ISSUE #4701]fix use tcp protocol client send message, it throw a DecoderException --- .../common/protocol/tcp/codec/Codec.java | 213 ++++++++---------- .../common/protocol/tcp/codec/CodecTest.java | 4 +- .../runtime/boot/AbstractTCPServer.java | 3 +- 3 files changed, 100 insertions(+), 120 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index cded7d67e7..f59a0a9987 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -36,8 +36,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; -import io.netty.handler.codec.MessageToByteEncoder; -import io.netty.handler.codec.ReplayingDecoder; import com.fasterxml.jackson.core.JsonProcessingException; @@ -48,139 +46,122 @@ @Slf4j public class Codec extends ByteToMessageCodec { - private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4; + private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4; // 4M private static final byte[] CONSTANT_MAGIC_FLAG = serializeBytes("EventMesh"); private static final byte[] VERSION = serializeBytes("0000"); - private Encoder encoder = new Encoder(); - private Decoder decoder = new Decoder(); - @Override protected void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception { - encoder.encode(ctx, pkg, out); - } + Preconditions.checkNotNull(pkg, "TcpPackage cannot be null"); + final Header header = pkg.getHeader(); + Preconditions.checkNotNull(header, "TcpPackage header cannot be null", header); + if (log.isDebugEnabled()) { + log.debug("Encoder pkg={}", JsonUtils.toJSONString(pkg)); + } - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - decoder.decode(ctx, in, out); - } + final byte[] headerData = JsonUtils.toJSONBytes(header); + final byte[] bodyData; - public static class Encoder extends MessageToByteEncoder { - - @Override - public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception { - Preconditions.checkNotNull(pkg, "TcpPackage cannot be null"); - final Header header = pkg.getHeader(); - Preconditions.checkNotNull(header, "TcpPackage header cannot be null", header); - if (log.isDebugEnabled()) { - log.debug("Encoder pkg={}", JsonUtils.toJSONString(pkg)); - } - - final byte[] headerData = JsonUtils.toJSONBytes(header); - final byte[] bodyData; - - if (StringUtils.equals(Constants.CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) { - bodyData = (byte[]) pkg.getBody(); - } else { - bodyData = JsonUtils.toJSONBytes(pkg.getBody()); - } - - int headerLength = ArrayUtils.getLength(headerData); - int bodyLength = ArrayUtils.getLength(bodyData); - - final int length = CONSTANT_MAGIC_FLAG.length + VERSION.length + headerLength + bodyLength; - - if (length > FRAME_MAX_LENGTH) { - throw new IllegalArgumentException("message size is exceed limit!"); - } - /** - * Header + Body, Format: - *
-             * ┌───────────────┬─────────────┬──────────────────┬──────────────────┬──────────────────┬─────────────────┐
-             * │   MAGIC_FLAG  │   VERSION   │ package length   │   Header length  │      Header      │      body       │
-             * │    (9bytes)   │  (4bytes)   │    (4bytes)      │      (4bytes)    │   (header bytes) │   (body bytes)  │
-             * └───────────────┴─────────────┴──────────────────┴──────────────────┴──────────────────┴─────────────────┘
-             * 
- */ - out.writeBytes(CONSTANT_MAGIC_FLAG); - out.writeBytes(VERSION); - out.writeInt(length); - out.writeInt(headerLength); - if (headerData != null) { - out.writeBytes(headerData); - } - if (bodyData != null) { - out.writeBytes(bodyData); - } + if (StringUtils.equals(Constants.CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) { + bodyData = (byte[]) pkg.getBody(); + } else { + bodyData = JsonUtils.toJSONBytes(pkg.getBody()); } - } - public static class Decoder extends ReplayingDecoder { - - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - try { - if (null == in) { - return; - } - - byte[] flagBytes = parseFlag(in); - byte[] versionBytes = parseVersion(in); - validateFlag(flagBytes, versionBytes, ctx); - - final int length = in.readInt(); - final int headerLength = in.readInt(); - final int bodyLength = length - CONSTANT_MAGIC_FLAG.length - VERSION.length - headerLength; - Header header = parseHeader(in, headerLength); - Object body = parseBody(in, header, bodyLength); - - Package pkg = new Package(header, body); - out.add(pkg); - } catch (Exception e) { - log.error("decode error| received data: {}.", deserializeBytes(in.array()), e); - throw e; - } - } + int headerLength = ArrayUtils.getLength(headerData); + int bodyLength = ArrayUtils.getLength(bodyData); + + final int length = CONSTANT_MAGIC_FLAG.length + VERSION.length + headerLength + bodyLength; - private byte[] parseFlag(ByteBuf in) { - final byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length]; - in.readBytes(flagBytes); - return flagBytes; + if (length > FRAME_MAX_LENGTH) { + throw new IllegalArgumentException("message size is exceed limit!"); + } + /** + * Header + Body, Format: + *
+         * ┌───────────────┬─────────────┬──────────────────┬──────────────────┬──────────────────┬─────────────────┐
+         * │   MAGIC_FLAG  │   VERSION   │ package length   │   Header length  │      Header      │      body       │
+         * │    (9bytes)   │  (4bytes)   │    (4bytes)      │      (4bytes)    │   (header bytes) │   (body bytes)  │
+         * └───────────────┴─────────────┴──────────────────┴──────────────────┴──────────────────┴─────────────────┘
+         * 
+ */ + out.writeBytes(CONSTANT_MAGIC_FLAG); + out.writeBytes(VERSION); + out.writeInt(length); + out.writeInt(headerLength); + if (headerData != null) { + out.writeBytes(headerData); } + if (bodyData != null) { + out.writeBytes(bodyData); + } + } - private byte[] parseVersion(ByteBuf in) { - final byte[] versionBytes = new byte[VERSION.length]; - in.readBytes(versionBytes); - return versionBytes; + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (in == null) { + return; + } + if (in.readableBytes() < CONSTANT_MAGIC_FLAG.length + VERSION.length + 4 + 4) { + // Not enough data to read the package length and header length + return; } + byte[] flagBytes = parseFlag(in); + byte[] versionBytes = parseVersion(in); + validateFlagAndVersion(flagBytes, versionBytes, ctx); + final int packageLength = in.readInt(); + final int headerLength = in.readInt(); + if (in.readableBytes() < packageLength - 13) { + // Not enough data yet, reset the reader index and wait for more data + in.resetReaderIndex(); + return; + } + final int bodyLength = packageLength - CONSTANT_MAGIC_FLAG.length - VERSION.length - headerLength; + Header header = parseHeader(in, headerLength); + Object body = parseBody(in, header, bodyLength); - private Header parseHeader(ByteBuf in, int headerLength) throws JsonProcessingException { - if (headerLength <= 0) { - return null; - } - final byte[] headerData = new byte[headerLength]; - in.readBytes(headerData); - LogUtils.debug(log, "Decode headerJson={}", deserializeBytes(headerData)); - return JsonUtils.parseObject(headerData, Header.class); + Package pkg = new Package(header, body); + out.add(pkg); + } + + private byte[] parseFlag(ByteBuf in) { + final byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length]; + in.readBytes(flagBytes); + return flagBytes; + } + + private byte[] parseVersion(ByteBuf in) { + final byte[] versionBytes = new byte[VERSION.length]; + in.readBytes(versionBytes); + return versionBytes; + } + + private Header parseHeader(ByteBuf in, int headerLength) throws JsonProcessingException { + if (headerLength <= 0) { + return null; } + final byte[] headerData = new byte[headerLength]; + in.readBytes(headerData); + LogUtils.debug(log, "Decode headerJson={}", deserializeBytes(headerData)); + return JsonUtils.parseObject(headerData, Header.class); + } - private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonProcessingException { - if (bodyLength <= 0 || header == null) { - return null; - } - final byte[] bodyData = new byte[bodyLength]; - in.readBytes(bodyData); - LogUtils.debug(log, "Decode bodyJson={}", deserializeBytes(bodyData)); - return deserializeBody(deserializeBytes(bodyData), header); + private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonProcessingException { + if (bodyLength <= 0 || header == null) { + return null; } + final byte[] bodyData = new byte[bodyLength]; + in.readBytes(bodyData); + LogUtils.debug(log, "Decode bodyJson={}", deserializeBytes(bodyData)); + return deserializeBody(deserializeBytes(bodyData), header); + } - private void validateFlag(byte[] flagBytes, byte[] versionBytes, ChannelHandlerContext ctx) { - if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) || !Arrays.equals(versionBytes, VERSION)) { - String errorMsg = String.format("invalid magic flag or version|flag=%s|version=%s|remoteAddress=%s", - deserializeBytes(flagBytes), deserializeBytes(versionBytes), ctx.channel().remoteAddress()); - throw new IllegalArgumentException(errorMsg); - } + private void validateFlagAndVersion(byte[] flagBytes, byte[] versionBytes, ChannelHandlerContext ctx) { + if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) || !Arrays.equals(versionBytes, VERSION)) { + String errorMsg = String.format("invalid magic flag or version|flag=%s|version=%s|remoteAddress=%s", + deserializeBytes(flagBytes), deserializeBytes(versionBytes), ctx.channel().remoteAddress()); + throw new IllegalArgumentException(errorMsg); } } diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java index 29724e9b66..462fe1507e 100644 --- a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java @@ -37,10 +37,10 @@ public void testCodec() throws Exception { header.setCmd(Command.HELLO_REQUEST); Package testP = new Package(header); testP.setBody(new Object()); - Codec.Encoder ce = new Codec.Encoder(); + Codec ce = new Codec(); ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(); ce.encode(null, testP, buf); - Codec.Decoder cd = new Codec.Decoder(); + Codec cd = new Codec(); ArrayList result = new ArrayList<>(); cd.decode(null, buf, result); Assertions.assertNotNull(result.get(0)); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java index be19e3d63a..4fcf6520c9 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java @@ -187,8 +187,7 @@ private class TcpServerInitializer extends ChannelInitializer { protected void initChannel(SocketChannel ch) { globalTrafficShapingHandler = newGTSHandler(tcpThreadPoolGroup.getScheduler(), eventMeshTCPConfiguration.getCtc().getReadLimit()); ch.pipeline() - .addLast(getWorkerGroup(), new Codec.Encoder()) - .addLast(getWorkerGroup(), new Codec.Decoder()) + .addLast(getWorkerGroup(), new Codec()) .addLast(getWorkerGroup(), "global-traffic-shaping", globalTrafficShapingHandler) .addLast(getWorkerGroup(), "channel-traffic-shaping", newCTSHandler(eventMeshTCPConfiguration.getCtc().getReadLimit())) .addLast(getWorkerGroup(), tcpConnectionHandler) From 7b36433edb0734e64bd9051a7e6d893d82ba045d Mon Sep 17 00:00:00 2001 From: mxsm Date: Mon, 1 Jan 2024 21:37:59 +0800 Subject: [PATCH 2/7] optmize code --- .../apache/eventmesh/common/protocol/tcp/codec/Codec.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index f59a0a9987..9148f33d22 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -103,7 +103,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t if (in == null) { return; } - if (in.readableBytes() < CONSTANT_MAGIC_FLAG.length + VERSION.length + 4 + 4) { + final int prefixLength = CONSTANT_MAGIC_FLAG.length + VERSION.length; + if (in.readableBytes() < prefixLength + 4 + 4) { // Not enough data to read the package length and header length return; } @@ -112,12 +113,12 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t validateFlagAndVersion(flagBytes, versionBytes, ctx); final int packageLength = in.readInt(); final int headerLength = in.readInt(); - if (in.readableBytes() < packageLength - 13) { + if (in.readableBytes() < packageLength - prefixLength) { // Not enough data yet, reset the reader index and wait for more data in.resetReaderIndex(); return; } - final int bodyLength = packageLength - CONSTANT_MAGIC_FLAG.length - VERSION.length - headerLength; + final int bodyLength = packageLength - prefixLength - headerLength; Header header = parseHeader(in, headerLength); Object body = parseBody(in, header, bodyLength); From bab4dd24be6169a565cc5ba23fe6d0553412f223 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 6 Jan 2024 20:34:14 +0800 Subject: [PATCH 3/7] refactor with LengthFieldBasedFrameDecoder --- .../common/protocol/tcp/codec/Codec.java | 226 ++++++++++-------- .../common/protocol/tcp/codec/CodecTest.java | 12 +- .../runtime/boot/AbstractTCPServer.java | 3 +- .../client/tcp/common/TcpClient.java | 2 +- 4 files changed, 134 insertions(+), 109 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index 9148f33d22..babf1d4c07 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -25,18 +25,16 @@ import org.apache.eventmesh.common.protocol.tcp.Subscription; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.utils.JsonUtils; -import org.apache.eventmesh.common.utils.LogUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import java.util.Arrays; -import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageCodec; - +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.MessageToByteEncoder; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Preconditions; @@ -44,125 +42,149 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class Codec extends ByteToMessageCodec { +public class Codec { - private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4; // 4M + private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4; private static final byte[] CONSTANT_MAGIC_FLAG = serializeBytes("EventMesh"); private static final byte[] VERSION = serializeBytes("0000"); - @Override - protected void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception { - Preconditions.checkNotNull(pkg, "TcpPackage cannot be null"); - final Header header = pkg.getHeader(); - Preconditions.checkNotNull(header, "TcpPackage header cannot be null", header); - if (log.isDebugEnabled()) { - log.debug("Encoder pkg={}", JsonUtils.toJSONString(pkg)); - } + public static final int PREFIX_LENGTH = CONSTANT_MAGIC_FLAG.length + VERSION.length; - final byte[] headerData = JsonUtils.toJSONBytes(header); - final byte[] bodyData; + public static class Encoder extends MessageToByteEncoder { - if (StringUtils.equals(Constants.CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) { - bodyData = (byte[]) pkg.getBody(); - } else { - bodyData = JsonUtils.toJSONBytes(pkg.getBody()); - } + @Override + public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception { + Preconditions.checkNotNull(pkg, "TcpPackage cannot be null"); + final Header header = pkg.getHeader(); + Preconditions.checkNotNull(header, "TcpPackage header cannot be null", header); + if (log.isDebugEnabled()) { + log.debug("Encoder pkg={}", JsonUtils.toJSONString(pkg)); + } - int headerLength = ArrayUtils.getLength(headerData); - int bodyLength = ArrayUtils.getLength(bodyData); + final byte[] headerData = JsonUtils.toJSONBytes(header); + final byte[] bodyData; - final int length = CONSTANT_MAGIC_FLAG.length + VERSION.length + headerLength + bodyLength; + if (StringUtils.equals(Constants.CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) { + bodyData = (byte[]) pkg.getBody(); + } else { + bodyData = JsonUtils.toJSONBytes(pkg.getBody()); + } - if (length > FRAME_MAX_LENGTH) { - throw new IllegalArgumentException("message size is exceed limit!"); - } - /** - * Header + Body, Format: - *
-         * ┌───────────────┬─────────────┬──────────────────┬──────────────────┬──────────────────┬─────────────────┐
-         * │   MAGIC_FLAG  │   VERSION   │ package length   │   Header length  │      Header      │      body       │
-         * │    (9bytes)   │  (4bytes)   │    (4bytes)      │      (4bytes)    │   (header bytes) │   (body bytes)  │
-         * └───────────────┴─────────────┴──────────────────┴──────────────────┴──────────────────┴─────────────────┘
-         * 
- */ - out.writeBytes(CONSTANT_MAGIC_FLAG); - out.writeBytes(VERSION); - out.writeInt(length); - out.writeInt(headerLength); - if (headerData != null) { - out.writeBytes(headerData); - } - if (bodyData != null) { - out.writeBytes(bodyData); + int headerLength = ArrayUtils.getLength(headerData); + int bodyLength = ArrayUtils.getLength(bodyData); + + final int length = PREFIX_LENGTH + headerLength + bodyLength; + + if (length > FRAME_MAX_LENGTH) { + throw new IllegalArgumentException("message size is exceed limit!"); + } + /** + * Header + Body, Format: + *
+             * ┌───────────────┬─────────────┬──────────────────┬──────────────────┬──────────────────┬─────────────────┐
+             * │   MAGIC_FLAG  │   VERSION   │ package length   │   Header length  │      Header      │      body       │
+             * │    (9bytes)   │  (4bytes)   │    (4bytes)      │      (4bytes)    │   (header bytes) │   (body bytes)  │
+             * └───────────────┴─────────────┴──────────────────┴──────────────────┴──────────────────┴─────────────────┘
+             * 
+ */ + out.writeBytes(CONSTANT_MAGIC_FLAG); + out.writeBytes(VERSION); + out.writeInt(length); + out.writeInt(headerLength); + if (headerData != null) { + out.writeBytes(headerData); + } + if (bodyData != null) { + out.writeBytes(bodyData); + } } } - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - if (in == null) { - return; - } - final int prefixLength = CONSTANT_MAGIC_FLAG.length + VERSION.length; - if (in.readableBytes() < prefixLength + 4 + 4) { - // Not enough data to read the package length and header length - return; - } - byte[] flagBytes = parseFlag(in); - byte[] versionBytes = parseVersion(in); - validateFlagAndVersion(flagBytes, versionBytes, ctx); - final int packageLength = in.readInt(); - final int headerLength = in.readInt(); - if (in.readableBytes() < packageLength - prefixLength) { - // Not enough data yet, reset the reader index and wait for more data - in.resetReaderIndex(); - return; + public static class Decoder extends LengthFieldBasedFrameDecoder { + + + public Decoder() { + super(FRAME_MAX_LENGTH, 13, 4, -9, 0); } - final int bodyLength = packageLength - prefixLength - headerLength; - Header header = parseHeader(in, headerLength); - Object body = parseBody(in, header, bodyLength); - Package pkg = new Package(header, body); - out.add(pkg); - } + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - private byte[] parseFlag(ByteBuf in) { - final byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length]; - in.readBytes(flagBytes); - return flagBytes; - } + ByteBuf target = null; - private byte[] parseVersion(ByteBuf in) { - final byte[] versionBytes = new byte[VERSION.length]; - in.readBytes(versionBytes); - return versionBytes; - } + try { + target = (ByteBuf) super.decode(ctx, in); + if (null == target) { + return null; + } + byte[] flagBytes = parseFlag(target); + byte[] versionBytes = parseVersion(target); + validateFlag(flagBytes, versionBytes, ctx); + + final int length = target.readInt(); + final int headerLength = target.readInt(); + final int bodyLength = length - PREFIX_LENGTH - headerLength; + Header header = parseHeader(target, headerLength); + Object body = parseBody(target, header, bodyLength); + + Package pkg = new Package(header, body); + return pkg; + + } catch (Exception ex) { + log.error("decode error", ex); + ctx.channel().close(); + } finally { + if (target != null) { + target.release(); + } + } - private Header parseHeader(ByteBuf in, int headerLength) throws JsonProcessingException { - if (headerLength <= 0) { return null; } - final byte[] headerData = new byte[headerLength]; - in.readBytes(headerData); - LogUtils.debug(log, "Decode headerJson={}", deserializeBytes(headerData)); - return JsonUtils.parseObject(headerData, Header.class); - } - private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonProcessingException { - if (bodyLength <= 0 || header == null) { - return null; + private byte[] parseFlag(ByteBuf in) { + final byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length]; + in.readBytes(flagBytes); + return flagBytes; + } + + private byte[] parseVersion(ByteBuf in) { + final byte[] versionBytes = new byte[VERSION.length]; + in.readBytes(versionBytes); + return versionBytes; + } + + private Header parseHeader(ByteBuf in, int headerLength) throws JsonProcessingException { + if (headerLength <= 0) { + return null; + } + final byte[] headerData = new byte[headerLength]; + in.readBytes(headerData); + if (log.isDebugEnabled()) { + log.debug("Decode headerJson={}", deserializeBytes(headerData)); + } + return JsonUtils.parseObject(headerData, Header.class); + } + + private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonProcessingException { + if (bodyLength <= 0 || header == null) { + return null; + } + final byte[] bodyData = new byte[bodyLength]; + in.readBytes(bodyData); + if (log.isDebugEnabled()) { + log.debug("Decode bodyJson={}", deserializeBytes(bodyData)); + } + return deserializeBody(deserializeBytes(bodyData), header); } - final byte[] bodyData = new byte[bodyLength]; - in.readBytes(bodyData); - LogUtils.debug(log, "Decode bodyJson={}", deserializeBytes(bodyData)); - return deserializeBody(deserializeBytes(bodyData), header); - } - private void validateFlagAndVersion(byte[] flagBytes, byte[] versionBytes, ChannelHandlerContext ctx) { - if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) || !Arrays.equals(versionBytes, VERSION)) { - String errorMsg = String.format("invalid magic flag or version|flag=%s|version=%s|remoteAddress=%s", - deserializeBytes(flagBytes), deserializeBytes(versionBytes), ctx.channel().remoteAddress()); - throw new IllegalArgumentException(errorMsg); + private void validateFlag(byte[] flagBytes, byte[] versionBytes, ChannelHandlerContext ctx) { + if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) || !Arrays.equals(versionBytes, VERSION)) { + String errorMsg = String.format("invalid magic flag or version|flag=%s|version=%s|remoteAddress=%s", + deserializeBytes(flagBytes), deserializeBytes(versionBytes), ctx.channel().remoteAddress()); + throw new IllegalArgumentException(errorMsg); + } } } @@ -205,7 +227,9 @@ private static Object deserializeBody(String bodyJsonString, Header header) thro case REDIRECT_TO_CLIENT: return JsonUtils.parseObject(bodyJsonString, RedirectInfo.class); default: - LogUtils.warn(log, "Invalidate TCP command: {}", command); + if (log.isWarnEnabled()) { + log.warn("Invalidate TCP command: {}", command); + } return null; } } diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java index 462fe1507e..7c66e0c6e8 100644 --- a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java @@ -20,6 +20,8 @@ import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.tcp.codec.Codec.Decoder; +import org.apache.eventmesh.common.protocol.tcp.codec.Codec.Encoder; import java.util.ArrayList; @@ -37,14 +39,12 @@ public void testCodec() throws Exception { header.setCmd(Command.HELLO_REQUEST); Package testP = new Package(header); testP.setBody(new Object()); - Codec ce = new Codec(); + Encoder ce = new Codec.Encoder(); ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(); ce.encode(null, testP, buf); - Codec cd = new Codec(); - ArrayList result = new ArrayList<>(); - cd.decode(null, buf, result); - Assertions.assertNotNull(result.get(0)); - Assertions.assertEquals(testP.getHeader(), ((Package) result.get(0)).getHeader()); + Decoder cd = new Codec.Decoder(); + final Package decode = (Package)cd.decode(null, buf); + Assertions.assertEquals(testP.getHeader(), decode.getHeader()); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java index 4fcf6520c9..be19e3d63a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java @@ -187,7 +187,8 @@ private class TcpServerInitializer extends ChannelInitializer { protected void initChannel(SocketChannel ch) { globalTrafficShapingHandler = newGTSHandler(tcpThreadPoolGroup.getScheduler(), eventMeshTCPConfiguration.getCtc().getReadLimit()); ch.pipeline() - .addLast(getWorkerGroup(), new Codec()) + .addLast(getWorkerGroup(), new Codec.Encoder()) + .addLast(getWorkerGroup(), new Codec.Decoder()) .addLast(getWorkerGroup(), "global-traffic-shaping", globalTrafficShapingHandler) .addLast(getWorkerGroup(), "channel-traffic-shaping", newCTSHandler(eventMeshTCPConfiguration.getCtc().getReadLimit())) .addLast(getWorkerGroup(), tcpConnectionHandler) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java index a8b164c6db..c42d9af274 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java @@ -110,7 +110,7 @@ protected synchronized void open(SimpleChannelInboundHandler handler) t @Override public void initChannel(SocketChannel ch) { - ch.pipeline().addLast(new Codec()) + ch.pipeline().addLast(new Codec.Encoder(), new Codec.Decoder()) .addLast(handler, newExceptionHandler()); } }); From 4e1dbae9ac2d1e6cac558dc8328e94f0fb4a84ab Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 6 Jan 2024 20:41:19 +0800 Subject: [PATCH 4/7] fix code style --- .../org/apache/eventmesh/common/protocol/tcp/codec/Codec.java | 1 - .../apache/eventmesh/common/protocol/tcp/codec/CodecTest.java | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index babf1d4c07..8823e02e1a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -103,7 +103,6 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E public static class Decoder extends LengthFieldBasedFrameDecoder { - public Decoder() { super(FRAME_MAX_LENGTH, 13, 4, -9, 0); } diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java index 7c66e0c6e8..19907ffd2b 100644 --- a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/tcp/codec/CodecTest.java @@ -23,8 +23,6 @@ import org.apache.eventmesh.common.protocol.tcp.codec.Codec.Decoder; import org.apache.eventmesh.common.protocol.tcp.codec.Codec.Encoder; -import java.util.ArrayList; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -43,7 +41,7 @@ public void testCodec() throws Exception { ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(); ce.encode(null, testP, buf); Decoder cd = new Codec.Decoder(); - final Package decode = (Package)cd.decode(null, buf); + final Package decode = (Package) cd.decode(null, buf); Assertions.assertEquals(testP.getHeader(), decode.getHeader()); } From 413f44bda302e2c1942a3a60731da21179f9afb2 Mon Sep 17 00:00:00 2001 From: mxsm Date: Mon, 8 Jan 2024 00:06:21 +0800 Subject: [PATCH 5/7] optimize code --- .../org/apache/eventmesh/common/protocol/tcp/codec/Codec.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index 8823e02e1a..14c831ccd6 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -104,7 +104,7 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E public static class Decoder extends LengthFieldBasedFrameDecoder { public Decoder() { - super(FRAME_MAX_LENGTH, 13, 4, -9, 0); + super(FRAME_MAX_LENGTH, PREFIX_LENGTH, Integer.BYTES, -9, 0); } @Override From fdca40ae8aae02436ccfc80abf6f9fe6c52ca9c1 Mon Sep 17 00:00:00 2001 From: mxsm Date: Mon, 8 Jan 2024 00:09:16 +0800 Subject: [PATCH 6/7] fix log print --- .../eventmesh/common/protocol/tcp/codec/Codec.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index 14c831ccd6..ad95aa0981 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -25,6 +25,7 @@ import org.apache.eventmesh.common.protocol.tcp.Subscription; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.common.utils.LogUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -160,9 +161,7 @@ private Header parseHeader(ByteBuf in, int headerLength) throws JsonProcessingEx } final byte[] headerData = new byte[headerLength]; in.readBytes(headerData); - if (log.isDebugEnabled()) { - log.debug("Decode headerJson={}", deserializeBytes(headerData)); - } + LogUtils.debug(log, "Decode headerJson={}", deserializeBytes(headerData)); return JsonUtils.parseObject(headerData, Header.class); } @@ -172,9 +171,7 @@ private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonP } final byte[] bodyData = new byte[bodyLength]; in.readBytes(bodyData); - if (log.isDebugEnabled()) { - log.debug("Decode bodyJson={}", deserializeBytes(bodyData)); - } + LogUtils.debug(log, "Decode headerJson={}", deserializeBytes(bodyData)); return deserializeBody(deserializeBytes(bodyData), header); } From 32656bccde921360a9bbeb8fae7f7845243a1bbc Mon Sep 17 00:00:00 2001 From: mxsm Date: Mon, 8 Jan 2024 23:24:17 +0800 Subject: [PATCH 7/7] optimize code and add some comments --- .../common/protocol/tcp/codec/Codec.java | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index ad95aa0981..2417021b6a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -50,7 +50,9 @@ public class Codec { private static final byte[] CONSTANT_MAGIC_FLAG = serializeBytes("EventMesh"); private static final byte[] VERSION = serializeBytes("0000"); - public static final int PREFIX_LENGTH = CONSTANT_MAGIC_FLAG.length + VERSION.length; + private static final int PREFIX_LENGTH = CONSTANT_MAGIC_FLAG.length + VERSION.length; //13 + + private static final int PACKAGE_BYTES_FIELD_LENGTH = 4; public static class Encoder extends MessageToByteEncoder { @@ -105,7 +107,24 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E public static class Decoder extends LengthFieldBasedFrameDecoder { public Decoder() { - super(FRAME_MAX_LENGTH, PREFIX_LENGTH, Integer.BYTES, -9, 0); + /** + * lengthAdjustment value = -9 explain: + * Header + Body, Format: + *
+             * ┌───────────────┬─────────────┬──────────────────┬──────────────────┬──────────────────┬─────────────────┐
+             * │   MAGIC_FLAG  │   VERSION   │ package length   │   Header length  │      Header      │      body       │
+             * │    (9bytes)   │  (4bytes)   │    (4bytes)      │      (4bytes)    │   (header bytes) │   (body bytes)  │
+             * └───────────────┴─────────────┴──────────────────┴──────────────────┴──────────────────┴─────────────────┘
+             * 
+ * package length = MAGIC_FLAG + VERSION + Header length + Body length,Currently, + * adding MAGIC_FLAG + VERSION + package length field (4 bytes) actually adds 17 bytes. + * However, the value of the package length field is only reduced by the four bytes of + * the package length field itself and the four bytes of the header length field. + * Therefore, the compensation value to be added to the length field value is -9, + * which means subtracting the extra 9 bytes. + * Refer to the encoding in the {@link Encoder} + */ + super(FRAME_MAX_LENGTH, PREFIX_LENGTH, PACKAGE_BYTES_FIELD_LENGTH, -9, 0); } @Override @@ -171,7 +190,7 @@ private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonP } final byte[] bodyData = new byte[bodyLength]; in.readBytes(bodyData); - LogUtils.debug(log, "Decode headerJson={}", deserializeBytes(bodyData)); + LogUtils.debug(log, "Decode bodyJson={}", deserializeBytes(bodyData)); return deserializeBody(deserializeBytes(bodyData), header); } @@ -223,9 +242,7 @@ private static Object deserializeBody(String bodyJsonString, Header header) thro case REDIRECT_TO_CLIENT: return JsonUtils.parseObject(bodyJsonString, RedirectInfo.class); default: - if (log.isWarnEnabled()) { - log.warn("Invalidate TCP command: {}", command); - } + LogUtils.warn(log, "Invalidate TCP command: {}", command); return null; } }