From b7739588db8ac6d4a555e365b59bd498de900868 Mon Sep 17 00:00:00 2001 From: karsonto Date: Tue, 9 Jan 2024 17:40:13 +0800 Subject: [PATCH 1/2] processor enhancement --- .../runtime/boot/AbstractHTTPServer.java | 82 ++++++++++--------- .../runtime/boot/EventMeshHTTPServer.java | 48 +++++------ .../AbstractHttpRequestProcessor.java | 24 ++++++ .../http/processor/AdminMetricsProcessor.java | 9 +- .../processor/AdminShutdownProcessor.java | 9 +- .../processor/BatchSendMessageProcessor.java | 9 +- .../BatchSendMessageV2Processor.java | 9 +- .../http/processor/CreateTopicProcessor.java | 6 ++ .../http/processor/DeleteTopicProcessor.java | 6 ++ .../http/processor/HandlerService.java | 17 ++-- .../http/processor/HeartBeatProcessor.java | 9 +- .../http/processor/HttpProcessor.java | 7 ++ .../LocalSubscribeEventProcessor.java | 6 ++ .../LocalUnSubscribeEventProcessor.java | 6 ++ .../processor/QuerySubscriptionProcessor.java | 6 ++ .../RemoteSubscribeEventProcessor.java | 6 ++ .../RemoteUnSubscribeEventProcessor.java | 6 ++ .../http/processor/ReplyMessageProcessor.java | 9 +- .../processor/SendAsyncEventProcessor.java | 6 ++ .../processor/SendAsyncMessageProcessor.java | 9 +- .../SendAsyncRemoteEventProcessor.java | 6 ++ .../processor/SendSyncMessageProcessor.java | 9 +- .../http/processor/SubscribeProcessor.java | 11 ++- .../http/processor/UnSubscribeProcessor.java | 9 +- .../processor/inf/HttpRequestProcessor.java | 7 ++ 25 files changed, 240 insertions(+), 91 deletions(-) create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AbstractHttpRequestProcessor.java diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index da33f98529..20b7691cfb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -28,7 +28,6 @@ import org.apache.eventmesh.common.protocol.http.header.Header; import org.apache.eventmesh.common.utils.AssertUtils; import org.apache.eventmesh.common.utils.LogUtils; -import org.apache.eventmesh.runtime.common.Pair; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; @@ -96,7 +95,6 @@ /** * HTTP serves as the runtime module server for the protocol - * */ @Slf4j public abstract class AbstractHTTPServer extends AbstractRemotingServer { @@ -119,7 +117,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer { /** * key: request code */ - protected final transient Map> httpRequestProcessorTable = + protected final transient Map httpRequestProcessorTable = new ConcurrentHashMap<>(64); private HttpConnectionHandler httpConnectionHandler; @@ -199,11 +197,10 @@ public void shutdown() throws Exception { /** * Registers the processors required by the runtime module */ - public void registerProcessor(final Integer requestCode, final HttpRequestProcessor processor, final ThreadPoolExecutor executor) { + public void registerProcessor(final Integer requestCode, final HttpRequestProcessor processor) { AssertUtils.notNull(requestCode, "requestCode can't be null"); AssertUtils.notNull(processor, "processor can't be null"); - AssertUtils.notNull(executor, "executor can't be null"); - this.httpRequestProcessorTable.put(requestCode.toString(), new Pair<>(processor, executor)); + this.httpRequestProcessorTable.putIfAbsent(requestCode.toString(), processor); } /** @@ -402,46 +399,55 @@ private void injectHttpRequestHeader(final ChannelHandlerContext ctx, final Http private void processHttpCommandRequest(final ChannelHandlerContext ctx, final AsyncContext asyncContext) { final HttpCommand request = asyncContext.getRequest(); - final Pair choosed = httpRequestProcessorTable.get(request.getRequestCode()); - try { - choosed.getObject2().submit(() -> { - try { - final HttpRequestProcessor processor = choosed.getObject1(); - if (processor.rejectRequest()) { - final HttpCommand responseCommand = - request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR); - asyncContext.onComplete(responseCommand); - - if (asyncContext.isComplete()) { - sendResponse(ctx, responseCommand.httpResponse()); - LogUtils.debug(log, "{}", asyncContext.getResponse()); - final Map traceMap = asyncContext.getRequest().getHeader().toMap(); - TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(traceMap, + final HttpRequestProcessor choosed = httpRequestProcessorTable.get(request.getRequestCode()); + Runnable runnable = () -> { + try { + final HttpRequestProcessor processor = choosed; + if (processor.rejectRequest()) { + final HttpCommand responseCommand = + request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR); + asyncContext.onComplete(responseCommand); + + if (asyncContext.isComplete()) { + sendResponse(ctx, responseCommand.httpResponse()); + LogUtils.debug(log, "{}", asyncContext.getResponse()); + final Map traceMap = asyncContext.getRequest().getHeader().toMap(); + TraceUtils.finishSpanWithException(TraceUtils.prepareServerSpan(traceMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false), - traceMap, - EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null); - } - - return; + traceMap, + EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null); } - processor.processRequest(ctx, asyncContext); - if (!asyncContext.isComplete()) { - return; - } + return; + } - metrics.getSummaryMetrics() - .recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime()); + processor.processRequest(ctx, asyncContext); + if (!asyncContext.isComplete()) { + return; + } - LogUtils.debug(log, "{}", asyncContext.getResponse()); + metrics.getSummaryMetrics() + .recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime()); - sendResponse(ctx, asyncContext.getResponse().httpResponse()); + LogUtils.debug(log, "{}", asyncContext.getResponse()); + + sendResponse(ctx, asyncContext.getResponse().httpResponse()); + + } catch (Exception e) { + log.error("process error", e); + } + }; + + try { + if (Objects.nonNull(choosed.executor())) { + choosed.executor().execute(() -> { + runnable.run(); + }); + } else { + runnable.run(); + } - } catch (Exception e) { - log.error("process error", e); - } - }); } catch (RejectedExecutionException re) { asyncContext.onComplete(request.createHttpCommandResponse(EventMeshRetCode.OVERLOAD)); metrics.getSummaryMetrics().recordHTTPDiscard(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index cfe1ff14dc..d6d017bd68 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -63,7 +63,6 @@ import java.util.List; import java.util.Optional; -import java.util.concurrent.ThreadPoolExecutor; import org.assertj.core.util.Lists; @@ -240,68 +239,59 @@ private void unRegister() { } private void registerHTTPRequestProcessor() throws Exception { - HTTPThreadPoolGroup httpThreadPoolGroup = super.getHttpThreadPoolGroup(); - - ThreadPoolExecutor batchMsgExecutor = httpThreadPoolGroup.getBatchMsgExecutor(); final BatchSendMessageProcessor batchSendMessageProcessor = new BatchSendMessageProcessor(this); - registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(), batchSendMessageProcessor, batchMsgExecutor); + registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(), batchSendMessageProcessor); final BatchSendMessageV2Processor batchSendMessageV2Processor = new BatchSendMessageV2Processor(this); - registerProcessor(RequestCode.MSG_BATCH_SEND_V2.getRequestCode(), batchSendMessageV2Processor, - batchMsgExecutor); + registerProcessor(RequestCode.MSG_BATCH_SEND_V2.getRequestCode(), batchSendMessageV2Processor); - ThreadPoolExecutor sendMsgExecutor = httpThreadPoolGroup.getSendMsgExecutor(); final SendSyncMessageProcessor sendSyncMessageProcessor = new SendSyncMessageProcessor(this); - registerProcessor(RequestCode.MSG_SEND_SYNC.getRequestCode(), sendSyncMessageProcessor, sendMsgExecutor); + registerProcessor(RequestCode.MSG_SEND_SYNC.getRequestCode(), sendSyncMessageProcessor); final SendAsyncMessageProcessor sendAsyncMessageProcessor = new SendAsyncMessageProcessor(this); - registerProcessor(RequestCode.MSG_SEND_ASYNC.getRequestCode(), sendAsyncMessageProcessor, sendMsgExecutor); + registerProcessor(RequestCode.MSG_SEND_ASYNC.getRequestCode(), sendAsyncMessageProcessor); final SendAsyncEventProcessor sendAsyncEventProcessor = new SendAsyncEventProcessor(this); - this.getHandlerService().register(sendAsyncEventProcessor, sendMsgExecutor); + this.getHandlerService().register(sendAsyncEventProcessor); - ThreadPoolExecutor remoteMsgExecutor = httpThreadPoolGroup.getRemoteMsgExecutor(); final SendAsyncRemoteEventProcessor sendAsyncRemoteEventProcessor = new SendAsyncRemoteEventProcessor(this); - this.getHandlerService().register(sendAsyncRemoteEventProcessor, remoteMsgExecutor); + this.getHandlerService().register(sendAsyncRemoteEventProcessor); - ThreadPoolExecutor runtimeAdminExecutor = httpThreadPoolGroup.getRuntimeAdminExecutor(); final AdminMetricsProcessor adminMetricsProcessor = new AdminMetricsProcessor(this); - registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor, runtimeAdminExecutor); + registerProcessor(RequestCode.ADMIN_METRICS.getRequestCode(), adminMetricsProcessor); - ThreadPoolExecutor clientManageExecutor = httpThreadPoolGroup.getClientManageExecutor(); final HeartBeatProcessor heartProcessor = new HeartBeatProcessor(this); - registerProcessor(RequestCode.HEARTBEAT.getRequestCode(), heartProcessor, clientManageExecutor); + registerProcessor(RequestCode.HEARTBEAT.getRequestCode(), heartProcessor); final SubscribeProcessor subscribeProcessor = new SubscribeProcessor(this); - registerProcessor(RequestCode.SUBSCRIBE.getRequestCode(), subscribeProcessor, clientManageExecutor); + registerProcessor(RequestCode.SUBSCRIBE.getRequestCode(), subscribeProcessor); final LocalSubscribeEventProcessor localSubscribeEventProcessor = new LocalSubscribeEventProcessor(this); - this.getHandlerService().register(localSubscribeEventProcessor, clientManageExecutor); + this.getHandlerService().register(localSubscribeEventProcessor); final RemoteSubscribeEventProcessor remoteSubscribeEventProcessor = new RemoteSubscribeEventProcessor(this); - this.getHandlerService().register(remoteSubscribeEventProcessor, clientManageExecutor); + this.getHandlerService().register(remoteSubscribeEventProcessor); final UnSubscribeProcessor unSubscribeProcessor = new UnSubscribeProcessor(this); - registerProcessor(RequestCode.UNSUBSCRIBE.getRequestCode(), unSubscribeProcessor, clientManageExecutor); + registerProcessor(RequestCode.UNSUBSCRIBE.getRequestCode(), unSubscribeProcessor); final LocalUnSubscribeEventProcessor localUnSubscribeEventProcessor = new LocalUnSubscribeEventProcessor(this); - this.getHandlerService().register(localUnSubscribeEventProcessor, clientManageExecutor); + this.getHandlerService().register(localUnSubscribeEventProcessor); final RemoteUnSubscribeEventProcessor remoteUnSubscribeEventProcessor = new RemoteUnSubscribeEventProcessor(this); - this.getHandlerService().register(remoteUnSubscribeEventProcessor, clientManageExecutor); + this.getHandlerService().register(remoteUnSubscribeEventProcessor); - ThreadPoolExecutor replyMsgExecutor = httpThreadPoolGroup.getReplyMsgExecutor(); final ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this); - registerProcessor(RequestCode.REPLY_MESSAGE.getRequestCode(), replyMessageProcessor, replyMsgExecutor); + registerProcessor(RequestCode.REPLY_MESSAGE.getRequestCode(), replyMessageProcessor); final CreateTopicProcessor createTopicProcessor = new CreateTopicProcessor(this); - this.getHandlerService().register(createTopicProcessor, clientManageExecutor); + this.getHandlerService().register(createTopicProcessor); final DeleteTopicProcessor deleteTopicProcessor = new DeleteTopicProcessor(this); - this.getHandlerService().register(deleteTopicProcessor, clientManageExecutor); + this.getHandlerService().register(deleteTopicProcessor); final QuerySubscriptionProcessor querySubscriptionProcessor = new QuerySubscriptionProcessor(this); - this.getHandlerService().register(querySubscriptionProcessor, clientManageExecutor); + this.getHandlerService().register(querySubscriptionProcessor); registerWebhook(); } @@ -371,4 +361,6 @@ public MetaStorage getMetaStorage() { public HTTPClientPool getHttpClientPool() { return httpClientPool; } + + } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AbstractHttpRequestProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AbstractHttpRequestProcessor.java new file mode 100644 index 0000000000..81a4747aab --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AbstractHttpRequestProcessor.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.core.protocol.http.processor; + +import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; + +public abstract class AbstractHttpRequestProcessor implements HttpRequestProcessor { + +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java index 817d339b6b..82bf0996b0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java @@ -17,21 +17,26 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import io.netty.channel.ChannelHandlerContext; import lombok.RequiredArgsConstructor; @RequiredArgsConstructor -public class AdminMetricsProcessor implements HttpRequestProcessor { +public class AdminMetricsProcessor extends AbstractHttpRequestProcessor { private final EventMeshHTTPServer eventMeshHTTPServer; @Override public void processRequest(ChannelHandlerContext ctx, AsyncContext asyncContext) throws Exception { } + + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getRuntimeAdminExecutor(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java index e9e9751301..b998ee1749 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; import org.apache.eventmesh.common.protocol.http.common.RequestCode; @@ -24,7 +25,6 @@ import org.apache.eventmesh.runtime.boot.EventMeshServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.slf4j.Logger; @@ -35,7 +35,7 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor -public class AdminShutdownProcessor implements HttpRequestProcessor { +public class AdminShutdownProcessor extends AbstractHttpRequestProcessor { public final Logger cmdLogger = LoggerFactory.getLogger(EventMeshConstants.CMD); @@ -54,4 +54,9 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext HttpCommand responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS); asyncContext.onComplete(responseEventMeshCommand); } + + @Override + public Executor executor() { + return (Runnable runnable)-> {runnable.run();}; + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java index 1606a8c7af..00dac662be 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -41,7 +42,6 @@ import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext; import org.apache.eventmesh.runtime.util.RemotingHelper; @@ -66,7 +66,7 @@ import com.google.common.base.Stopwatch; -public class BatchSendMessageProcessor implements HttpRequestProcessor { +public class BatchSendMessageProcessor extends AbstractHttpRequestProcessor { private final Logger cmdLogger = LoggerFactory.getLogger(EventMeshConstants.CMD); @@ -289,4 +289,9 @@ public void onException(OnExceptionContext context) { SendMessageBatchResponseBody.class); return; } + + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getBatchMsgExecutor(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java index 41e9ab2628..3fe0a0d929 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -40,7 +41,6 @@ import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; @@ -59,7 +59,7 @@ import io.cloudevents.core.builder.CloudEventBuilder; import io.netty.channel.ChannelHandlerContext; -public class BatchSendMessageV2Processor implements HttpRequestProcessor { +public class BatchSendMessageV2Processor extends AbstractHttpRequestProcessor { private final Logger cmdLogger = LoggerFactory.getLogger(EventMeshConstants.CMD); @@ -255,4 +255,9 @@ public void onException(OnExceptionContext context) { completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader, EventMeshRetCode.SUCCESS, null, SendMessageBatchV2ResponseBody.class); } + + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getBatchMsgExecutor(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java index 0554197bb2..8354dac291 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -152,4 +153,9 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest public String[] paths() { return new String[]{RequestURI.CREATE_TOPIC.getRequestURI()}; } + + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getClientManageExecutor(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java index 417397721a..5924251986 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -175,4 +176,9 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest public String[] paths() { return new String[]{RequestURI.DELETE_TOPIC.getRequestURI()}; } + + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getClientManageExecutor(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java index 7fb0bde52d..b27e7095c5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.enums.ConnectionType; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; @@ -87,20 +88,26 @@ public void init() { log.info("HandlerService start "); } - public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) { + public void register(HttpProcessor httpProcessor, Executor threadPoolExecutor) { for (String path : httpProcessor.paths()) { this.register(path, httpProcessor, threadPoolExecutor); } } - public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) { + public void register(HttpProcessor httpProcessor) { + for (String path : httpProcessor.paths()) { + this.register(path, httpProcessor, httpProcessor.executor()); + } + } + + public void register(String path, HttpProcessor httpProcessor, Executor threadPoolExecutor) { if (httpProcessorMap.containsKey(path)) { throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ", path, httpProcessor.getClass().getSimpleName())); } ProcessorWrapper processorWrapper = new ProcessorWrapper(); - processorWrapper.threadPoolExecutor = threadPoolExecutor; + processorWrapper.executor = threadPoolExecutor; if (httpProcessor instanceof AsyncHttpProcessor) { processorWrapper.async = (AsyncHttpProcessor) httpProcessor; } @@ -141,7 +148,7 @@ public void handler(ChannelHandlerContext ctx, HttpRequest httpRequest, ThreadPo handlerSpecific.ctx = ctx; handlerSpecific.traceOperation = traceOperation; handlerSpecific.asyncContext = new AsyncContext<>(new HttpEventWrapper(), null, asyncContextCompleteHandler); - processorWrapper.threadPoolExecutor.execute(handlerSpecific); + processorWrapper.executor.execute(handlerSpecific); } catch (Exception e) { log.error(e.getMessage(), e); this.sendResponse(ctx, httpRequest, HttpResponseUtils.createInternalServerError()); @@ -381,7 +388,7 @@ public void recordSendBatchMsgFailed(int count) { private static class ProcessorWrapper { - private ThreadPoolExecutor threadPoolExecutor; + private Executor executor; private HttpProcessor httpProcessor; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java index c6914d34ff..ebce934305 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody; import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatResponseBody; @@ -34,7 +35,6 @@ import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; @@ -53,7 +53,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class HeartBeatProcessor implements HttpRequestProcessor { +public class HeartBeatProcessor extends AbstractHttpRequestProcessor { private final transient EventMeshHTTPServer eventMeshHTTPServer; @@ -187,6 +187,11 @@ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext tmpClientList, final List localClientList) { Objects.requireNonNull(tmpClientList, "tmpClientList can not be null"); Objects.requireNonNull(localClientList, "localClientList can not be null"); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HttpProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HttpProcessor.java index 9b74e4301b..8cd1c9ede1 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HttpProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HttpProcessor.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; +import java.util.concurrent.Executor; /** * http processor @@ -28,4 +29,10 @@ public interface HttpProcessor { String[] paths(); HttpResponse handler(HttpRequest httpRequest); + + /** + * + * @return {@link Executor} + */ + default Executor executor() {return null;} } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java index 7e9086d71e..61ddc1a842 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; @@ -192,6 +193,11 @@ public String[] paths() { return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()}; } + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getClientManageExecutor(); + } + private ClientInfo getClientInfo(final HttpEventWrapper requestWrapper) { final Map requestHeaderMap = requestWrapper.getSysHeaderMap(); ClientInfo clientInfo = new ClientInfo(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java index 0160d36e3c..c61e73c1eb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; @@ -232,6 +233,11 @@ public String[] paths() { return new String[]{RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()}; } + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getClientManageExecutor(); + } + private void registerClient(final HttpEventWrapper requestWrapper, final String consumerGroup, final List topicList, final String url) { Objects.requireNonNull(requestWrapper, "requestWrapper can not be null"); Objects.requireNonNull(consumerGroup, "consumerGroup can not be null"); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java index ea01494442..1fbaf0e8c5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -115,4 +116,9 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest public String[] paths() { return new String[]{RequestURI.SUBSCRIPTION_QUERY.getRequestURI()}; } + + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getClientManageExecutor(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java index 43f90ea661..8c8274f37f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; @@ -220,4 +221,9 @@ public String[] paths() { return new String[]{RequestURI.SUBSCRIBE_REMOTE.getRequestURI()}; } + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getClientManageExecutor(); + } + } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java index edbd9ccfa9..f43beae646 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; @@ -187,4 +188,9 @@ public String[] paths() { return new String[]{RequestURI.UNSUBSCRIBE_REMOTE.getRequestURI()}; } + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getClientManageExecutor(); + } + } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java index 510bb20542..0dbf0cdf96 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -40,7 +41,6 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; @@ -58,7 +58,7 @@ import io.cloudevents.core.builder.CloudEventBuilder; import io.netty.channel.ChannelHandlerContext; -public class ReplyMessageProcessor implements HttpRequestProcessor { +public class ReplyMessageProcessor extends AbstractHttpRequestProcessor { public final Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE); @@ -248,4 +248,9 @@ public void onException(OnExceptionContext context) { summaryMetrics.recordReplyMsgCost(endTime - startTime); } } + + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getReplyMsgExecutor(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java index 34e4ffcb35..3dbcd1ae94 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.AclException; @@ -312,4 +313,9 @@ public void onException(final OnExceptionContext context) { public String[] paths() { return new String[]{RequestURI.PUBLISH.getRequestURI()}; } + + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getSendMsgExecutor(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java index 153c607fbc..f37200adb7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -41,7 +42,6 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; @@ -63,7 +63,7 @@ import io.netty.channel.ChannelHandlerContext; import io.opentelemetry.api.trace.Span; -public class SendAsyncMessageProcessor implements HttpRequestProcessor { +public class SendAsyncMessageProcessor extends AbstractHttpRequestProcessor { private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE); @@ -307,6 +307,11 @@ public void onException(OnExceptionContext context) { } } + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getSendMsgExecutor(); + } + private void spanWithException(CloudEvent event, String protocolVersion, EventMeshRetCode retCode) { Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event), EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java index 3f59234bde..c158ffb8eb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -327,4 +328,9 @@ public String[] paths() { return new String[]{RequestURI.PUBLISH_BRIDGE.getRequestURI()}; } + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getRemoteMsgExecutor(); + } + } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java index 44dcdff50e..a0b503fb01 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; @@ -40,7 +41,6 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; @@ -59,7 +59,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class SendSyncMessageProcessor implements HttpRequestProcessor { +public class SendSyncMessageProcessor extends AbstractHttpRequestProcessor { private transient EventMeshHTTPServer eventMeshHTTPServer; @@ -287,4 +287,9 @@ public void onException(final Throwable e) { return; } + + @Override + public Executor executor() { + return eventMeshHTTPServer.getHttpThreadPoolGroup().getSendMsgExecutor(); + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java index 971631689d..297f0bf496 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody; @@ -37,7 +38,6 @@ import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler; -import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.eventmesh.runtime.util.WebhookUtil; @@ -52,7 +52,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class SubscribeProcessor implements HttpRequestProcessor { +public class SubscribeProcessor extends AbstractHttpRequestProcessor { private final transient EventMeshHTTPServer eventMeshHTTPServer; @@ -199,6 +199,11 @@ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext topicList, diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java index 570f11c998..0cf66d3446 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor.inf; +import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; @@ -67,4 +68,10 @@ default String getExtension(CloudEvent event, String protocolKey) { return Objects.isNull(extension) ? "" : extension.toString(); } + /** + * + * @return {@link Executor} + */ + Executor executor(); + } From da6c96f3f69fed3813ef2c233c22ac0f8b23cf96 Mon Sep 17 00:00:00 2001 From: karsonto Date: Wed, 10 Jan 2024 10:17:51 +0800 Subject: [PATCH 2/2] fix code style --- .../runtime/boot/EventMeshHTTPServer.java | 9 +-- .../http/processor/AdminMetricsProcessor.java | 4 +- .../processor/AdminShutdownProcessor.java | 7 +- .../processor/BatchSendMessageProcessor.java | 2 +- .../BatchSendMessageV2Processor.java | 2 +- .../http/processor/CreateTopicProcessor.java | 4 +- .../http/processor/DeleteTopicProcessor.java | 4 +- .../http/processor/HandlerService.java | 2 +- .../http/processor/HeartBeatProcessor.java | 2 +- .../http/processor/HttpProcessor.java | 8 +- .../LocalSubscribeEventProcessor.java | 4 +- .../LocalUnSubscribeEventProcessor.java | 8 +- .../processor/QuerySubscriptionProcessor.java | 4 +- .../RemoteSubscribeEventProcessor.java | 4 +- .../RemoteUnSubscribeEventProcessor.java | 4 +- .../http/processor/ReplyMessageProcessor.java | 2 +- .../processor/SendAsyncEventProcessor.java | 4 +- .../processor/SendAsyncMessageProcessor.java | 4 +- .../SendAsyncRemoteEventProcessor.java | 4 +- .../processor/SendSyncMessageProcessor.java | 6 +- .../http/processor/ShortHttpProcessor.java | 1 + .../http/processor/SubscribeProcessor.java | 4 +- .../http/processor/UnSubscribeProcessor.java | 78 +++++++------------ .../http/processor/WebHookProcessor.java | 2 +- .../processor/inf/HttpRequestProcessor.java | 3 +- 25 files changed, 78 insertions(+), 98 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index d6d017bd68..11c1f1d55c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -71,6 +71,7 @@ import lombok.extern.slf4j.Slf4j; + /** * Add multiple managers to the underlying server */ @@ -84,22 +85,16 @@ public class EventMeshHTTPServer extends AbstractHTTPServer { private final Acl acl; private final EventBus eventBus = new EventBus(); - + private final transient HTTPClientPool httpClientPool = new HTTPClientPool(10); private ConsumerManager consumerManager; private ProducerManager producerManager; private SubscriptionManager subscriptionManager; - private FilterEngine filterEngine; - private TransformerEngine transformerEngine; - private HttpRetryer httpRetryer; - private transient RateLimiter msgRateLimiter; private transient RateLimiter batchRateLimiter; - private final transient HTTPClientPool httpClientPool = new HTTPClientPool(10); - public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMeshHTTPConfiguration eventMeshHttpConfiguration) { super(eventMeshHttpConfiguration.getHttpServerPort(), diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java index 82bf0996b0..b3fbf0d6a8 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminMetricsProcessor.java @@ -17,15 +17,17 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; +import java.util.concurrent.Executor; + import io.netty.channel.ChannelHandlerContext; import lombok.RequiredArgsConstructor; + @RequiredArgsConstructor public class AdminMetricsProcessor extends AbstractHttpRequestProcessor { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java index b998ee1749..4688ee106f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AdminShutdownProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; import org.apache.eventmesh.common.protocol.http.common.RequestCode; @@ -27,6 +26,8 @@ import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.util.RemotingHelper; +import java.util.concurrent.Executor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,8 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext @Override public Executor executor() { - return (Runnable runnable)-> {runnable.run();}; + return (Runnable runnable) -> { + runnable.run(); + }; } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java index 00dac662be..b484e1fb9f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -54,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java index 3fe0a0d929..b8fc24a042 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -50,6 +49,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java index 8354dac291..a2098974a0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/CreateTopicProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -38,6 +37,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,7 +151,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest @Override public String[] paths() { - return new String[]{RequestURI.CREATE_TOPIC.getRequestURI()}; + return new String[] {RequestURI.CREATE_TOPIC.getRequestURI()}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java index 5924251986..a600a17e16 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/DeleteTopicProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -40,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,7 +174,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest @Override public String[] paths() { - return new String[]{RequestURI.DELETE_TOPIC.getRequestURI()}; + return new String[] {RequestURI.DELETE_TOPIC.getRequestURI()}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java index b27e7095c5..04518b4dab 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.enums.ConnectionType; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; @@ -44,6 +43,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java index ebce934305..87c5b2ead6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody; import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatResponseBody; @@ -47,6 +46,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import io.netty.channel.ChannelHandlerContext; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HttpProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HttpProcessor.java index 8cd1c9ede1..64fd431b74 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HttpProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HttpProcessor.java @@ -17,9 +17,10 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; +import java.util.concurrent.Executor; + import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; -import java.util.concurrent.Executor; /** * http processor @@ -31,8 +32,9 @@ public interface HttpProcessor { HttpResponse handler(HttpRequest httpRequest); /** - * * @return {@link Executor} */ - default Executor executor() {return null;} + default Executor executor() { + return null; + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java index 61ddc1a842..b1cde8a175 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; @@ -42,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executor; import io.netty.channel.Channel; import io.netty.handler.codec.http.HttpRequest; @@ -190,7 +190,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final @Override public String[] paths() { - return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()}; + return new String[] {RequestURI.SUBSCRIBE_LOCAL.getRequestURI()}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java index c61e73c1eb..32cc1bd834 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; @@ -50,6 +49,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.HttpRequest; @@ -194,7 +194,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final } catch (Exception e) { LogUtils.error(log, "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms" - + "|topic={}|url={}", System.currentTimeMillis() - startTime, + + "|topic={}|url={}", System.currentTimeMillis() - startTime, JsonUtils.toJSONString(unSubTopicList), unSubscribeUrl, e); handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null); @@ -216,7 +216,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final .removeIf(s -> StringUtils.equals(consumerGroup, s)); } catch (Exception e) { LogUtils.error(log, "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms" - + "|topic={}|url={}", System.currentTimeMillis() - startTime, + + "|topic={}|url={}", System.currentTimeMillis() - startTime, JsonUtils.toJSONString(unSubTopicList), unSubscribeUrl, e); handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null); @@ -230,7 +230,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final @Override public String[] paths() { - return new String[]{RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()}; + return new String[] {RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java index 1fbaf0e8c5..48a2bfa3a6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/QuerySubscriptionProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -35,6 +34,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,7 +114,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest @Override public String[] paths() { - return new String[]{RequestURI.SUBSCRIPTION_QUERY.getRequestURI()}; + return new String[] {RequestURI.SUBSCRIPTION_QUERY.getRequestURI()}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java index 8c8274f37f..84a5812afc 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; @@ -44,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -218,7 +218,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest @Override public String[] paths() { - return new String[]{RequestURI.SUBSCRIBE_REMOTE.getRequestURI()}; + return new String[] {RequestURI.SUBSCRIBE_REMOTE.getRequestURI()}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java index f43beae646..fbd1af445d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; @@ -44,6 +43,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.Executor; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -185,7 +185,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest @Override public String[] paths() { - return new String[]{RequestURI.UNSUBSCRIBE_REMOTE.getRequestURI()}; + return new String[] {RequestURI.UNSUBSCRIBE_REMOTE.getRequestURI()}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java index 0dbf0cdf96..6197caa303 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -49,6 +48,7 @@ import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java index 3dbcd1ae94..dd521d52a9 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.AclException; @@ -54,6 +53,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import io.cloudevents.CloudEvent; @@ -311,7 +311,7 @@ public void onException(final OnExceptionContext context) { @Override public String[] paths() { - return new String[]{RequestURI.PUBLISH.getRequestURI()}; + return new String[] {RequestURI.PUBLISH.getRequestURI()}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java index f37200adb7..53359b5928 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -53,6 +52,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -90,7 +90,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext HttpCommand request = asyncContext.getRequest(); String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); CMD_LOGGER.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get( - Integer.valueOf(request.getRequestCode())), + Integer.valueOf(request.getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, remoteAddr, localAddress); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java index c158ffb8eb..bd72869cac 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; @@ -52,6 +51,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import io.cloudevents.CloudEvent; @@ -325,7 +325,7 @@ private String getExtension(final CloudEvent event, final String protocolKey) { @Override public String[] paths() { - return new String[]{RequestURI.PUBLISH_BRIDGE.getRequestURI()}; + return new String[] {RequestURI.PUBLISH_BRIDGE.getRequestURI()}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java index a0b503fb01..9c158ddcec 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor; -import java.util.concurrent.Executor; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; @@ -50,6 +49,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import io.cloudevents.CloudEvent; @@ -218,7 +218,7 @@ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext asyncContext) - throws Exception { + public void processRequest(final ChannelHandlerContext ctx, final AsyncContext asyncContext) throws Exception { HttpCommand responseEventMeshCommand; final HttpCommand request = asyncContext.getRequest(); final String localAddress = IPUtils.getLocalAddress(); - LogUtils.info(log, "cmd={}|{}|client2eventMesh|from={}|to={}", - RequestCode.get(Integer.valueOf(request.getRequestCode())), - EventMeshConstants.PROTOCOL_HTTP, - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress); + LogUtils.info(log, "cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(request.getRequestCode())), + EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress); final UnSubscribeRequestHeader unSubscribeRequestHeader = (UnSubscribeRequestHeader) request.getHeader(); final UnSubscribeRequestBody unSubscribeRequestBody = (UnSubscribeRequestBody) request.getBody(); EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration(); - final UnSubscribeResponseHeader unSubscribeResponseHeader = - UnSubscribeResponseHeader - .buildHeader(Integer.valueOf(request.getRequestCode()), - eventMeshHttpConfiguration.getEventMeshCluster(), - localAddress, eventMeshHttpConfiguration.getEventMeshEnv(), - eventMeshHttpConfiguration.getEventMeshIDC()); + final UnSubscribeResponseHeader unSubscribeResponseHeader = UnSubscribeResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()), + eventMeshHttpConfiguration.getEventMeshCluster(), localAddress, eventMeshHttpConfiguration.getEventMeshEnv(), + eventMeshHttpConfiguration.getEventMeshIDC()); // validate header - if (StringUtils.isAnyBlank(unSubscribeRequestHeader.getIdc(), unSubscribeRequestHeader.getPid(), - unSubscribeRequestHeader.getSys()) + if (StringUtils.isAnyBlank(unSubscribeRequestHeader.getIdc(), unSubscribeRequestHeader.getPid(), unSubscribeRequestHeader.getSys()) || !StringUtils.isNumeric(unSubscribeRequestHeader.getPid())) { - completeResponse(request, asyncContext, unSubscribeResponseHeader, - EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, UnSubscribeResponseBody.class); + completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, + UnSubscribeResponseBody.class); return; } // validate body - if (StringUtils.isAnyBlank(unSubscribeRequestBody.getUrl(), unSubscribeRequestBody.getConsumerGroup()) - || CollectionUtils.isEmpty(unSubscribeRequestBody.getTopics())) { - completeResponse(request, asyncContext, unSubscribeResponseHeader, - EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, UnSubscribeResponseBody.class); + if (StringUtils.isAnyBlank(unSubscribeRequestBody.getUrl(), unSubscribeRequestBody.getConsumerGroup()) || CollectionUtils.isEmpty( + unSubscribeRequestBody.getTopics())) { + completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, + UnSubscribeResponseBody.class); return; } @@ -122,22 +115,19 @@ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext localConsumerGroupMap = - subscriptionManager.getLocalConsumerGroupMapping(); + ConcurrentHashMap localConsumerGroupMap = subscriptionManager.getLocalConsumerGroupMapping(); synchronized (subscriptionManager.getLocalClientInfoMapping()) { boolean isChange = true; registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl); for (final String unSubTopic : unSubTopicList) { - final List groupTopicClients = subscriptionManager.getLocalClientInfoMapping() - .get(consumerGroup + "@" + unSubTopic); + final List groupTopicClients = subscriptionManager.getLocalClientInfoMapping().get(consumerGroup + "@" + unSubTopic); final Iterator clientIterator = groupTopicClients.iterator(); while (clientIterator.hasNext()) { final Client client = clientIterator.next(); - if (StringUtils.equals(client.getPid(), pid) - && StringUtils.equals(client.getUrl(), unSubscribeUrl)) { + if (StringUtils.equals(client.getPid(), pid) && StringUtils.equals(client.getUrl(), unSubscribeUrl)) { LogUtils.warn(log, "client {} start unsubscribe", JsonUtils.toJSONString(client)); clientIterator.remove(); } @@ -182,17 +172,14 @@ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext StringUtils.contains(s, consumerGroup)); + subscriptionManager.getLocalClientInfoMapping().keySet().removeIf(s -> StringUtils.contains(s, consumerGroup)); // clean ConsumerGroupInfo - localConsumerGroupMap.keySet() - .removeIf(s -> StringUtils.equals(consumerGroup, s)); + localConsumerGroupMap.keySet().removeIf(s -> StringUtils.equals(consumerGroup, s)); } catch (Exception e) { - completeResponse(request, asyncContext, unSubscribeResponseHeader, - EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, - EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2), - UnSubscribeResponseBody.class); + completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, + EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2), UnSubscribeResponseBody.class); final long endTime = System.currentTimeMillis(); log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}", endTime - startTime, JsonUtils.toJSONString(unSubscribeRequestBody.getTopics()), unSubscribeRequestBody.getUrl(), e); @@ -234,9 +215,7 @@ public Executor executor() { return eventMeshHTTPServer.getHttpThreadPoolGroup().getClientManageExecutor(); } - private void registerClient(final UnSubscribeRequestHeader unSubscribeRequestHeader, - final String consumerGroup, - final List topicList, + private void registerClient(final UnSubscribeRequestHeader unSubscribeRequestHeader, final String consumerGroup, final List topicList, final String url) { for (final String topic : topicList) { final Client client = new Client(); @@ -251,8 +230,7 @@ private void registerClient(final UnSubscribeRequestHeader unSubscribeRequestHea client.setLastUpTime(new Date()); final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic(); - ConcurrentHashMap> localClientInfoMap = eventMeshHTTPServer.getSubscriptionManager() - .getLocalClientInfoMapping(); + ConcurrentHashMap> localClientInfoMap = eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping(); if (localClientInfoMap.containsKey(groupTopicKey)) { final List localClients = localClientInfoMap.get(groupTopicKey); boolean isContains = false; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/WebHookProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/WebHookProcessor.java index 256b149b33..0e0aaa2753 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/WebHookProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/WebHookProcessor.java @@ -41,7 +41,7 @@ public class WebHookProcessor implements ShortHttpProcessor { @Override public String[] paths() { - return new String[]{WebHookOperationConstant.CALLBACK_PATH_PREFIX}; + return new String[] {WebHookOperationConstant.CALLBACK_PATH_PREFIX}; } @Override diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java index 0cf66d3446..35231ef8b7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor.inf; -import java.util.concurrent.Executor; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode; @@ -28,6 +27,7 @@ import java.lang.reflect.Method; import java.util.Objects; +import java.util.concurrent.Executor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +69,6 @@ default String getExtension(CloudEvent event, String protocolKey) { } /** - * * @return {@link Executor} */ Executor executor();