diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java index 53a5bd976ce..757db41afdd 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java @@ -68,9 +68,6 @@ public class CommonConstants { public static final String PROXY_SENDER_MAX_RETRY = "proxy.sender.maxRetry"; public static final int DEFAULT_PROXY_SENDER_MAX_RETRY = 5; - public static final String PROXY_IS_FILE = "proxy.isFile"; - public static final boolean DEFAULT_IS_FILE = false; - public static final String PROXY_CLIENT_IO_THREAD_NUM = "client.iothread.num"; public static final int DEFAULT_PROXY_CLIENT_IO_THREAD_NUM = Runtime.getRuntime().availableProcessors(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 984baf6de63..a37a171a372 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -87,7 +87,6 @@ public class SenderManager { private final int aliveConnectionNum; private final boolean isCompress; private final int msgType; - private final boolean isFile; private final long maxSenderTimeout; private final int maxSenderRetry; private final long retrySleepTime; @@ -133,7 +132,6 @@ public SenderManager(InstanceProfile profile, String inlongGroupId, String sourc CommonConstants.PROXY_SENDER_MAX_RETRY, CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY); retrySleepTime = agentConf.getLong( CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP); - isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE); ioThreadNum = profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM, CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM); enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT, @@ -200,7 +198,6 @@ private void createMessageSender() throws Exception { ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId, authSecretKey); proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize); - proxyClientConfig.setFile(isFile); proxyClientConfig.setAliveConnections(aliveConnectionNum); proxyClientConfig.setIoThreadNum(ioThreadNum); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index ec3eff3badc..623e84d8c01 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -21,7 +21,6 @@ import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.common.util.MessageUtils; import org.apache.inlong.sdk.dataproxy.codec.EncodeObject; -import org.apache.inlong.sdk.dataproxy.common.FileCallback; import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; @@ -833,122 +832,6 @@ private void addIndexCnt(String groupId, String streamId, long cnt) { } } - @Deprecated - public void asyncsendMessageData(FileCallback callback, List bodyList, String groupId, String streamId, - long dt, int sid, boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit, - Map extraAttrMap) throws ProxysdkException { - dt = ProxyUtils.covertZeroDt(dt); - if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) - || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { - throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); - } - if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) { - throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); - } - addIndexCnt(groupId, streamId, bodyList.size()); - - StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap); - - if (msgtype == 7 || msgtype == 8) { - EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, - isCompress, isReport, isGroupIdTransfer, - dt / 1000, sid, groupId, streamId, attrs.toString(), "data", ""); - encodeObject.setSupportLF(isSupportLF); - sender.asyncSendMessageIndex(encodeObject, callback, msgUUID, timeout, timeUnit); - } - } - - @Deprecated - private void asyncSendMetric(FileCallback callback, byte[] body, String groupId, String streamId, long dt, int sid, - String ip, String msgUUID, long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException { - dt = ProxyUtils.covertZeroDt(dt); - if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) { - throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); - } - if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) { - throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); - } - boolean isCompressEnd = false; - if (msgtype == 7 || msgtype == 8) { - sender.asyncSendMessageIndex(new EncodeObject(body, msgtype, isCompressEnd, - isReport, isGroupIdTransfer, dt / 1000, - sid, groupId, streamId, "", messageKey, ip), callback, msgUUID, timeout, timeUnit); - } - } - - @Deprecated - public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId, long dt, - int sid, String ip, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { - asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout, - timeUnit, "minute"); - } - - @Deprecated - public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId, String streamId, long dt, - int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { - asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, - "file"); - } - - @Deprecated - public String sendMessageData(List bodyList, String groupId, String streamId, long dt, int sid, - boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit, Map extraAttrMap) { - dt = ProxyUtils.covertZeroDt(dt); - if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) - || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { - return SendResult.INVALID_ATTRIBUTES.toString(); - } - if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) { - return SendResult.BODY_EXCEED_MAX_LEN.toString(); - } - addIndexCnt(groupId, streamId, bodyList.size()); - - StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap); - - if (msgtype == 7 || msgtype == 8) { - EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress, - isReport, isGroupIdTransfer, dt / 1000, - sid, groupId, streamId, attrs.toString(), "data", ""); - encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (sender) -> sender.syncSendMessageIndex(encodeObject, msgUUID, - timeout, timeUnit); - return attemptSendMessageIndex(sendOperation); - } - return null; - } - - @Deprecated - private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID, - long timeout, TimeUnit timeUnit, String messageKey) { - dt = ProxyUtils.covertZeroDt(dt); - if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) { - return SendResult.INVALID_ATTRIBUTES.toString(); - } - if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) { - return SendResult.BODY_EXCEED_MAX_LEN.toString(); - } - if (msgtype == 7 || msgtype == 8) { - EncodeObject encodeObject = new EncodeObject(body, msgtype, false, isReport, - isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", messageKey, ip); - Function sendOperation = (sender) -> sender.syncSendMessageIndex(encodeObject, msgUUID, - timeout, timeUnit); - return attemptSendMessageIndex(sendOperation); - } - return null; - } - - @Deprecated - public String sendMessageProxy(byte[] body, String groupId, String streamId, long dt, int sid, String ip, - String msgUUID, long timeout, TimeUnit timeUnit) { - return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute"); - } - - @Deprecated - public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID, - long timeout, TimeUnit timeUnit) { - return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file"); - } - private void shutdownInternalThreads() { indexCol.shutDown(); MANAGER_FETCHER_THREAD_STARTED.set(false); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index d74f876fabe..c3253805cb4 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -39,7 +39,6 @@ public class ProxyClientConfig { private int proxyUpdateIntervalMinutes; private int proxyUpdateMaxRetry; private String inlongGroupId; - private boolean isFile = false; private boolean requestByHttp = true; private boolean isNeedDataEncry = false; private boolean needAuthentication = false; @@ -196,14 +195,6 @@ public boolean isRequestByHttp() { return requestByHttp; } - public boolean isFile() { - return isFile; - } - - public void setFile(boolean file) { - isFile = file; - } - public String getInlongGroupId() { return inlongGroupId; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java deleted file mode 100644 index 8fce78257e0..00000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.common; - -public abstract class FileCallback implements SendMessageCallback { - - /* Invoked when a message is confirmed by TDBus. */ - public void onMessageAck(String result) { - } - - ; - - public void onMessageAck(SendResult result) { - } - - ; - - /* Invoked when a message transportation interrupted by an exception. */ - public void onException(Throwable e) { - } - - ; -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java index fc807050314..9e83f4673c4 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java @@ -20,8 +20,8 @@ public interface SendMessageCallback { /* Invoked when a message is confirmed by TDBus. */ - public void onMessageAck(SendResult result); + void onMessageAck(SendResult result); /* Invoked when a message transportation interrupted by an exception. */ - public void onException(Throwable e); + void onException(Throwable e); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java deleted file mode 100644 index 3685d0ad536..00000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.example; - -import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.common.FileCallback; -import org.apache.inlong.sdk.dataproxy.common.SendResult; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MyFileCallBack extends FileCallback { - - private static final Logger logger = LoggerFactory - .getLogger(MyFileCallBack.class); - private DefaultMessageSender messageSender = null; - private Event event = null; - - public MyFileCallBack() { - - } - - public MyFileCallBack(DefaultMessageSender messageSender, Event event) { - super(); - this.messageSender = messageSender; - this.event = event; - } - - public void onMessageAck(String result) { - logger.info("onMessageAck return result = {}", result); - } - - public void onMessageAck(SendResult result) { - if (result == SendResult.OK) { - logger.info("onMessageAck return Ok"); - } else { - logger.info("onMessageAck return failure = {}", result); - } - } - - public void onException(Throwable e) { - logger.error("Send message failure, error {}", e.getMessage()); - e.printStackTrace(); - } - -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java index 7aef6e705cd..d9b5c081327 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java @@ -18,16 +18,17 @@ package org.apache.inlong.sdk.dataproxy.example; import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.common.FileCallback; +import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MyMessageCallBack extends FileCallback { +public class MyMessageCallBack implements SendMessageCallback { private static final Logger logger = LoggerFactory .getLogger(MyMessageCallBack.class); + private DefaultMessageSender messageSender = null; private Event event = null; @@ -41,10 +42,7 @@ public MyMessageCallBack(DefaultMessageSender messageSender, Event event) { this.event = event; } - public void onMessageAck(String result) { - logger.info("onMessageAck return result = {}", result); - } - + @Override public void onMessageAck(SendResult result) { if (result == SendResult.OK) { logger.info("onMessageAck return Ok"); @@ -53,6 +51,7 @@ public void onMessageAck(SendResult result) { } } + @Override public void onException(Throwable e) { logger.error("Send message failure, error {}", e.getMessage()); e.printStackTrace(); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index 9581da1f805..50b3105b567 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -19,7 +19,6 @@ import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.codec.EncodeObject; -import org.apache.inlong.sdk.dataproxy.common.FileCallback; import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; @@ -63,7 +62,6 @@ public class Sender { private final TimeoutScanThread scanThread; private final ClientMgr clientMgr; private final ProxyClientConfig configure; - private final boolean isFile; private MetricWorkerThread metricWorker = null; private int clusterId = -1; @@ -98,7 +96,6 @@ public Sender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) thro throw new Exception("In OutNetwork isNeedDataEncry must be true!"); } } - this.isFile = configure.isFile(); scanThread = new TimeoutScanThread(callbacks, currentBufferSize, configure, clientMgr); scanThread.start(); @@ -172,15 +169,8 @@ public void notifyCallback(Channel channel, String messageId, SendResult result) if (callback == null) { return; } - if (isFile) { - String proxyip = channel.remoteAddress().toString(); - ((FileCallback) callback.getCallback()).onMessageAck(result.toString() - + "=" + proxyip.substring(1, proxyip.indexOf(':'))); - currentBufferSize.addAndGet(-callback.getSize()); - } else { - callback.getCallback().onMessageAck(result); - currentBufferSize.decrementAndGet(); - } + callback.getCallback().onMessageAck(result); + currentBufferSize.decrementAndGet(); } private SendResult syncSendInternalMessage(NettyClient client, EncodeObject encodeObject, String msgUUID, @@ -287,190 +277,6 @@ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID, lon return message; } - private SendResult syncSendMessageIndexInternal(NettyClient client, EncodeObject encodeObject, String msgUUID, - long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { - if (client == null || !client.isActive()) { - chooseProxy.remove(encodeObject.getMessageId()); - client = clientMgr.getClientByRoundRobin(); - if (client == null) { - return SendResult.NO_CONNECTION; - } - chooseProxy.put(encodeObject.getMessageId(), client); - } - - if (encodeObject.getMsgtype() == 7) { - int groupIdnum = 0; - int streamIdnum = 0; - if (encodeObject.getGroupId().equals(clientMgr.getGroupId())) { - groupIdnum = clientMgr.getGroupIdNum(); - streamIdnum = clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null - ? clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) - : 0; - } - encodeObject.setGroupIdNum(groupIdnum); - encodeObject.setStreamIdNum(streamIdnum); - if (groupIdnum == 0 || streamIdnum == 0) { - encodeObject.setGroupIdTransfer(false); - } - } - if (this.configure.isNeedDataEncry()) { - encodeObject.setEncryptEntry(true, configure.getUserName(), clientMgr.getEncryptConfigEntry()); - } else { - encodeObject.setEncryptEntry(false, null, null); - } - encodeObject.setMsgUUID(msgUUID); - SyncMessageCallable callable = new SyncMessageCallable(client, encodeObject, timeout, timeUnit); - syncCallables.put(encodeObject.getMessageId(), callable); - - Future future = threadPool.submit(callable); - return future.get(timeout, timeUnit); - } - - /** - * sync send - * - * @param encodeObject - * @param msgUUID - * @param timeout - * @param timeUnit - * @return - */ - public String syncSendMessageIndex(EncodeObject encodeObject, String msgUUID, long timeout, TimeUnit timeUnit) { - try { - SendResult message = null; - NettyClient client = chooseProxy.get(encodeObject.getMessageId()); - String proxyip = encodeObject.getProxyIp(); - if (proxyip != null && proxyip.length() != 0) { - client = clientMgr.getContainProxy(proxyip); - } - if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) { - LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(), - encodeObject.getAttributes()); - return SendResult.INVALID_ATTRIBUTES.toString(); - } - try { - message = syncSendMessageIndexInternal(client, encodeObject, - msgUUID, timeout, timeUnit); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - LOGGER.error("send message error {}", getExceptionStack(e)); - syncCallables.remove(encodeObject.getMessageId()); - return SendResult.THREAD_INTERRUPT.toString(); - } catch (ExecutionException e) { - // TODO Auto-generated catch block - LOGGER.error("ExecutionException {}", getExceptionStack(e)); - syncCallables.remove(encodeObject.getMessageId()); - return SendResult.UNKOWN_ERROR.toString(); - } catch (TimeoutException e) { - // TODO Auto-generated catch block - LOGGER.error("TimeoutException {}", getExceptionStack(e)); - // e.printStackTrace(); - SyncMessageCallable syncMessageCallable = syncCallables.remove(encodeObject.getMessageId()); - if (syncMessageCallable != null) { - NettyClient tmpClient = syncMessageCallable.getClient(); - if (tmpClient != null) { - Channel curChannel = tmpClient.getChannel(); - if (curChannel != null) { - LOGGER.error("channel maybe busy {}", curChannel); - scanThread.addTimeoutChannel(curChannel); - } - } - } - return SendResult.TIMEOUT.toString(); - } catch (Throwable e) { - LOGGER.error("syncSendMessage exception {}", getExceptionStack(e)); - syncCallables.remove(encodeObject.getMessageId()); - return SendResult.UNKOWN_ERROR.toString(); - } - scanThread.resetTimeoutChannel(client.getChannel()); - return message.toString() + "=" + client.getServerIP(); - } catch (Exception e) { - LOGGER.error("agent send error {}", getExceptionStack(e)); - syncCallables.remove(encodeObject.getMessageId()); - return SendResult.UNKOWN_ERROR.toString(); - } - } - - /** - * async send message index - * - * @param encodeObject - * @param callback - * @param msgUUID - * @param timeout - * @param timeUnit - * @throws ProxysdkException - */ - public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback callback, String msgUUID, long timeout, - TimeUnit timeUnit) throws ProxysdkException { - NettyClient client = chooseProxy.get(encodeObject.getMessageId()); - String proxyip = encodeObject.getProxyIp(); - if (proxyip != null && proxyip.length() != 0) { - client = clientMgr.getContainProxy(proxyip); - } - if (client == null || !client.isActive()) { - chooseProxy.remove(encodeObject.getMessageId()); - client = clientMgr.getClientByRoundRobin(); - if (client == null) { - throw new ProxysdkException(SendResult.NO_CONNECTION.toString()); - } - chooseProxy.put(encodeObject.getMessageId(), client); - } - if (currentBufferSize.get() >= asyncCallbackMaxSize) { - throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL"); - } - int size = 1; - if (isFile) { - if (encodeObject.getBodyBytes() != null) { - size = encodeObject.getBodyBytes().length; - } else { - for (byte[] bytes : encodeObject.getBodylist()) { - size = size + bytes.length; - } - } - if (currentBufferSize.addAndGet(size) >= asyncCallbackMaxSize) { - currentBufferSize.addAndGet(-size); - throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL"); - } - - } else { - if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) { - currentBufferSize.decrementAndGet(); - throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL"); - } - } - ConcurrentHashMap tmpCallBackMap = new ConcurrentHashMap<>(); - ConcurrentHashMap msgQueueMap = callbacks.putIfAbsent( - client.getChannel(), tmpCallBackMap); - if (msgQueueMap == null) { - msgQueueMap = tmpCallBackMap; - } - msgQueueMap.put(encodeObject.getMessageId(), new QueueObject(System.currentTimeMillis(), - callback, size, timeout, timeUnit)); - if (encodeObject.getMsgtype() == 7) { - int groupIdnum = 0; - int streamIdnum = 0; - if ((clientMgr.getGroupId().length() != 0) && (encodeObject.getGroupId().equals(clientMgr.getGroupId()))) { - groupIdnum = clientMgr.getGroupIdNum(); - streamIdnum = (clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null) - ? clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) - : 0; - } - encodeObject.setGroupIdNum(groupIdnum); - encodeObject.setStreamIdNum(streamIdnum); - if (groupIdnum == 0 || streamIdnum == 0) { - encodeObject.setGroupIdTransfer(false); - } - } - if (this.configure.isNeedDataEncry()) { - encodeObject.setEncryptEntry(true, configure.getUserName(), clientMgr.getEncryptConfigEntry()); - } else { - encodeObject.setEncryptEntry(false, null, null); - } - encodeObject.setMsgUUID(msgUUID); - client.write(encodeObject); - } - /** * whether is validate * @@ -539,24 +345,9 @@ public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback call throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); } int size = 1; - if (isFile) { - if (encodeObject.getBodyBytes() != null) { - size = encodeObject.getBodyBytes().length; - } else { - for (byte[] bytes : encodeObject.getBodylist()) { - size = size + bytes.length; - } - } - if (currentBufferSize.addAndGet(size) >= asyncCallbackMaxSize) { - currentBufferSize.addAndGet(-size); - throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL"); - } - - } else { - if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) { - currentBufferSize.decrementAndGet(); - throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL"); - } + if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) { + currentBufferSize.decrementAndGet(); + throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL"); } ConcurrentHashMap msgQueueMap = callbacks.computeIfAbsent(client.getChannel(), (k) -> new ConcurrentHashMap<>()); @@ -623,14 +414,8 @@ public void notifyConnectionDisconnected(Channel channel) { if (queueObject == null) { continue; } - if (isFile) { - ((FileCallback) queueObject.getCallback()) - .onMessageAck(SendResult.CONNECTION_BREAK.toString()); - currentBufferSize.addAndGet(-queueObject.getSize()); - } else { - queueObject.getCallback().onMessageAck(SendResult.CONNECTION_BREAK); - currentBufferSize.decrementAndGet(); - } + queueObject.getCallback().onMessageAck(SendResult.CONNECTION_BREAK); + currentBufferSize.decrementAndGet(); } msgQueueMap.clear(); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java index 270531bf5bf..e7358503608 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java @@ -19,7 +19,7 @@ import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.codec.EncodeObject; -import org.apache.inlong.sdk.dataproxy.common.FileCallback; +import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; import org.apache.inlong.sdk.dataproxy.metric.MessageRecord; import org.apache.inlong.sdk.dataproxy.metric.MetricConfig; @@ -192,7 +192,7 @@ private void tryToSendMetricToManager(EncodeObject encodeObject, MetricSendCallB callBack.increaseRetry(); try { if (callBack.getRetryCount() < 4) { - sender.asyncSendMessageIndex(encodeObject, callBack, + sender.asyncSendMessage(encodeObject, callBack, String.valueOf(System.currentTimeMillis()), 20, TimeUnit.SECONDS); } else { logger.error("Send metric failure: {} {}", encodeObject.getBodyBytes(), encodeObject.getBodylist()); @@ -267,7 +267,7 @@ private void flushMetric(boolean isClosing) { } } - private class MetricSendCallBack extends FileCallback { + private class MetricSendCallBack implements SendMessageCallback { private final EncodeObject encodeObject; private int retryCount = 0; @@ -285,17 +285,17 @@ public int getRetryCount() { } @Override - public void onMessageAck(String result) { - if (!SendResult.OK.toString().equals(result)) { - tryToSendMetricToManager(encodeObject, this); - } else { + public void onMessageAck(SendResult result) { + if (!SendResult.OK.equals(result)) { logger.debug("Send metric is ok!"); + } else { + tryToSendMetricToManager(encodeObject, this); } } @Override - public void onMessageAck(SendResult result) { - + public void onException(Throwable e) { + // } } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java index 8c77eae1092..f9e49802643 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java @@ -18,7 +18,6 @@ package org.apache.inlong.sdk.dataproxy.threads; import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; -import org.apache.inlong.sdk.dataproxy.common.FileCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; import org.apache.inlong.sdk.dataproxy.network.ClientMgr; import org.apache.inlong.sdk.dataproxy.network.QueueObject; @@ -143,13 +142,8 @@ private void checkMessageIdBasedCallbacks(Channel channel, // remove it before callback QueueObject queueObject1 = messageIdCallbacks.remove(messageId); if (queueObject1 != null) { - if (config.isFile()) { - ((FileCallback) queueObject1.getCallback()).onMessageAck(SendResult.TIMEOUT.toString()); - currentBufferSize.addAndGet(-queueObject1.getSize()); - } else { - queueObject1.getCallback().onMessageAck(SendResult.TIMEOUT); - currentBufferSize.decrementAndGet(); - } + queueObject1.getCallback().onMessageAck(SendResult.TIMEOUT); + currentBufferSize.decrementAndGet(); } addTimeoutChannel(channel); }