Skip to content

Commit

Permalink
fix code style
Browse files Browse the repository at this point in the history
  • Loading branch information
karsonto committed Jan 16, 2024
1 parent d7dded7 commit 24dab1a
Show file tree
Hide file tree
Showing 25 changed files with 80 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

import lombok.extern.slf4j.Slf4j;


/**
* Add multiple managers to the underlying server
*/
Expand All @@ -83,22 +84,16 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {

private final Acl acl;
private final EventBus eventBus = new EventBus();

private final transient HTTPClientPool httpClientPool = new HTTPClientPool(10);
private ConsumerManager consumerManager;
private ProducerManager producerManager;
private SubscriptionManager subscriptionManager;

private FilterEngine filterEngine;

private TransformerEngine transformerEngine;

private HttpRetryer httpRetryer;

private transient RateLimiter msgRateLimiter;
private transient RateLimiter batchRateLimiter;

private final transient HTTPClientPool httpClientPool = new HTTPClientPool(10);

public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {

super(eventMeshHttpConfiguration.getHttpServerPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;

import java.util.concurrent.Executor;

import io.netty.channel.ChannelHandlerContext;

import lombok.RequiredArgsConstructor;


@RequiredArgsConstructor
public class AdminMetricsProcessor extends AbstractHttpRequestProcessor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
Expand All @@ -27,6 +26,8 @@
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.util.RemotingHelper;

import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,6 +58,8 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>

@Override
public Executor executor() {
return (Runnable runnable)-> {runnable.run();};
return (Runnable runnable) -> {
runnable.run();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
Expand Down Expand Up @@ -53,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
Expand Down Expand Up @@ -49,6 +48,7 @@
import org.apache.commons.lang3.StringUtils;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand All @@ -37,6 +36,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand All @@ -39,6 +38,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.enums.ConnectionType;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
Expand All @@ -43,6 +42,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatResponseBody;
Expand Down Expand Up @@ -46,6 +45,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;

import io.netty.channel.ChannelHandlerContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;

import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import java.util.concurrent.Executor;

/**
* http processor
Expand All @@ -31,8 +32,9 @@ public interface HttpProcessor {
HttpResponse handler(HttpRequest httpRequest);

/**
*
* @return {@link Executor}
*/
default Executor executor() {return null;}
default Executor executor() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
Expand All @@ -41,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -186,7 +186,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final

@Override
public String[] paths() {
return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
return new String[] {RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
Expand Down Expand Up @@ -49,6 +48,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -215,6 +215,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
} catch (Exception e) {
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
+ "|topic={}|url={}", System.currentTimeMillis() - startTime, JsonUtils.toJSONString(unSubTopicList), unSubscribeUrl, e);

handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap,
responseBodyMap, null);
}
Expand All @@ -227,7 +228,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final

@Override
public String[] paths() {
return new String[]{RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()};
return new String[] {RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand All @@ -34,6 +33,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
Expand All @@ -44,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -218,7 +218,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest

@Override
public String[] paths() {
return new String[]{RequestURI.SUBSCRIBE_REMOTE.getRequestURI()};
return new String[] {RequestURI.SUBSCRIBE_REMOTE.getRequestURI()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
Expand All @@ -44,6 +43,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -185,7 +185,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest

@Override
public String[] paths() {
return new String[]{RequestURI.UNSUBSCRIBE_REMOTE.getRequestURI()};
return new String[] {RequestURI.UNSUBSCRIBE_REMOTE.getRequestURI()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
Expand Down Expand Up @@ -48,6 +47,7 @@
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.AclException;
Expand Down Expand Up @@ -53,6 +52,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import io.cloudevents.CloudEvent;
Expand Down Expand Up @@ -309,7 +309,7 @@ public void onException(final OnExceptionContext context) {

@Override
public String[] paths() {
return new String[]{RequestURI.PUBLISH.getRequestURI()};
return new String[] {RequestURI.PUBLISH.getRequestURI()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
Expand Down Expand Up @@ -52,6 +51,7 @@
import org.apache.commons.lang3.StringUtils;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
HttpCommand request = asyncContext.getRequest();
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
CMD_LOGGER.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(
Integer.valueOf(request.getRequestCode())),
Integer.valueOf(request.getRequestCode())),
EventMeshConstants.PROTOCOL_HTTP,
remoteAddr, localAddress);

Expand Down
Loading

0 comments on commit 24dab1a

Please sign in to comment.