diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java index 2496d2dea31..77f87ede4ae 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java @@ -51,6 +51,10 @@ public interface MethodDescriptor { Object getAttribute(String key); + Class[] getActualRequestTypes(); + + Class getActualResponseType(); + enum RpcType { UNARY, CLIENT_STREAM, diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionMethodDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionMethodDescriptor.java index 7d4fd1208aa..ac0778d601a 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionMethodDescriptor.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionMethodDescriptor.java @@ -99,12 +99,17 @@ private RpcType determineRpcType() { boolean returnIsVoid = returnClass.getName().equals(void.class.getName()); if (returnIsVoid && parameterClasses.length == 1 && isStreamType(parameterClasses[0])) { actualRequestTypes = Collections.emptyList().toArray(new Class[0]); + actualResponseType = obtainActualTypeInStreamObserver( + ((ParameterizedType) method.getGenericParameterTypes()[0]).getActualTypeArguments()[0]); return RpcType.SERVER_STREAM; } if (returnIsVoid && parameterClasses.length == 2 && !isStreamType(parameterClasses[0]) && isStreamType(parameterClasses[1])) { + actualRequestTypes = parameterClasses; + actualResponseType = obtainActualTypeInStreamObserver( + ((ParameterizedType) method.getGenericParameterTypes()[1]).getActualTypeArguments()[0]); return RpcType.SERVER_STREAM; } if (Arrays.stream(parameterClasses).anyMatch(this::isStreamType) || isStreamType(returnClass)) { @@ -119,13 +124,6 @@ private boolean isStreamType(Class classType) { return StreamObserver.class.isAssignableFrom(classType); } - private static Class obtainActualTypeInStreamObserver(Type typeInStreamObserver) { - return (Class) - (typeInStreamObserver instanceof ParameterizedType - ? ((ParameterizedType) typeInStreamObserver).getRawType() - : typeInStreamObserver); - } - @Override public String getMethodName() { return methodName; @@ -179,14 +177,23 @@ public Object getAttribute(String key) { return this.attributeMap.get(key); } + @Override public Class[] getActualRequestTypes() { return actualRequestTypes; } + @Override public Class getActualResponseType() { return actualResponseType; } + private Class obtainActualTypeInStreamObserver(Type typeInStreamObserver) { + return (Class) + (typeInStreamObserver instanceof ParameterizedType + ? ((ParameterizedType) typeInStreamObserver).getRawType() + : typeInStreamObserver); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/StubMethodDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/StubMethodDescriptor.java index 784c93e12bc..2e0c3f84a7e 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/StubMethodDescriptor.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/StubMethodDescriptor.java @@ -119,6 +119,16 @@ public Object getAttribute(String key) { return this.attributeMap.get(key); } + @Override + public Class[] getActualRequestTypes() { + return this.parameterClasses; + } + + @Override + public Class getActualResponseType() { + return this.returnClass; + } + @Override public Pack getRequestPack() { return requestPack; diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java index 8fc8867a438..4fd388d3362 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java @@ -73,19 +73,9 @@ public ReflectionPackableMethod( switch (method.getRpcType()) { case CLIENT_STREAM: case BI_STREAM: - actualRequestTypes = new Class[] { - obtainActualTypeInStreamObserver( - ((ParameterizedType) method.getMethod().getGenericReturnType()).getActualTypeArguments()[0]) - }; - actualResponseType = obtainActualTypeInStreamObserver( - ((ParameterizedType) method.getMethod().getGenericParameterTypes()[0]) - .getActualTypeArguments()[0]); - break; case SERVER_STREAM: - actualRequestTypes = method.getMethod().getParameterTypes(); - actualResponseType = obtainActualTypeInStreamObserver( - ((ParameterizedType) method.getMethod().getGenericParameterTypes()[1]) - .getActualTypeArguments()[0]); + actualRequestTypes = method.getActualRequestTypes(); + actualResponseType = method.getActualResponseType(); break; case UNARY: actualRequestTypes = method.getParameterClasses(); @@ -411,6 +401,9 @@ public byte[] pack(Object obj) throws IOException { for (String type : argumentsType) { builder.addArgTypes(type); } + if (actualRequestTypes == null || actualRequestTypes.length == 0) { + return builder.build().toByteArray(); + } ByteArrayOutputStream bos = new ByteArrayOutputStream(); for (int i = 0; i < arguments.length; i++) { Object argument = arguments[i]; diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java index 6a26fbc32f2..278524e4acc 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java @@ -207,10 +207,20 @@ private static boolean isSync(MethodDescriptor methodDescriptor, Invocation invo AsyncRpcResult invokeServerStream(MethodDescriptor methodDescriptor, Invocation invocation, ClientCall call) { RequestMetadata request = createRequest(methodDescriptor, invocation, null); - StreamObserver responseObserver = - (StreamObserver) invocation.getArguments()[1]; - final StreamObserver requestObserver = streamCall(call, request, responseObserver); - requestObserver.onNext(invocation.getArguments()[0]); + Object[] arguments = invocation.getArguments(); + final StreamObserver requestObserver; + if (arguments.length == 2) { + StreamObserver responseObserver = (StreamObserver) arguments[1]; + requestObserver = streamCall(call, request, responseObserver); + requestObserver.onNext(invocation.getArguments()[0]); + } else if (arguments.length == 1) { + StreamObserver responseObserver = (StreamObserver) arguments[0]; + requestObserver = streamCall(call, request, responseObserver); + requestObserver.onNext(null); + } else { + throw new IllegalStateException( + "The first parameter must be a StreamObserver when there are no parameters, or the second parameter must be a StreamObserver when there are parameters"); + } requestObserver.onCompleted(); return new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation); }