diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java index 307c79f1da..7292c1a577 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java @@ -42,7 +42,6 @@ import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; @@ -168,7 +167,7 @@ private void doSendStatusMsg(DefaultMessageSender sender) { INLONG_AGENT_SYSTEM, INLONG_AGENT_STATUS, AgentUtils.getCurrentTime(), - "", 30, TimeUnit.SECONDS); + ""); if (ret != SendResult.OK) { LOGGER.error("send status failed: ret {}", ret); } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java index abda6a2eab..774dee4247 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG; @@ -135,7 +134,7 @@ private void doSendStaticMsg(DefaultMessageSender sender) { INLONG_AGENT_SYSTEM, INLONG_FILE_STATIC, AgentUtils.getCurrentTime(), - "", 30, TimeUnit.SECONDS); + ""); if (ret != SendResult.OK) { LOGGER.error("send static failed: ret {}", ret); } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index d436a32636..ead9439114 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -201,6 +201,7 @@ private void createMessageSender() { proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM); proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM); proxyClientConfig.setProtocolType(ProtocolType.TCP); + proxyClientConfig.setRequestTimeoutMs(30000L); ThreadFactory SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-heartbeat", Thread.currentThread().isDaemon()); sender = new DefaultMessageSender(proxyClientConfig, SHARED_FACTORY); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java index 40d71d2859..26f8d131b4 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java @@ -17,8 +17,6 @@ package org.apache.inlong.sdk.dataproxy; -import java.util.concurrent.TimeUnit; - public class ConfigConstants { public static final String PROXY_SDK_VERSION = "1.2.11"; @@ -49,12 +47,13 @@ public class ConfigConstants { public static final int MAX_LINE_CNT = 30; - /* Default connection,connect timeout in milliseconds. */ - public static final long DEFAULT_CONNECT_TIMEOUT_MILLIS = - TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS); - - public static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = - TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS); + // connection timeout in milliseconds + public static final long VAL_DEF_CONNECT_TIMEOUT_MS = 20000L; + public static final long VAL_MIN_CONNECT_TIMEOUT_MS = 1L; + public static final long VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500L; + // request timeout in milliseconds + public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L; + public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L; public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216; public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index 9e2c8c06b5..153c43b8db 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -230,9 +230,8 @@ private String attemptSendMessageIndex(Function sendOperation) { return sendIndexResult; } - public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit) { - return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); + public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID) { + return sendMessage(body, groupId, streamId, dt, msgUUID, false); } /** @@ -243,13 +242,11 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long * @param streamId streamId * @param dt data report timestamp * @param msgUUID msg uuid - * @param timeout - * @param timeUnit * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ * @return SendResult.OK means success */ - public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, boolean isProxySend) { + public SendResult sendMessage(byte[] body, String groupId, + String streamId, long dt, String msgUUID, boolean isProxySend) { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) { return SendResult.INVALID_ATTRIBUTES; @@ -271,8 +268,7 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long new EncodeObject(Collections.singletonList(body), msgtype, isCompressEnd, isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, - timeout, timeUnit); + Function sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID); return attemptSendMessage(sendOperation); } else if (msgtype == 3 || msgtype == 5) { if (isProxySend) { @@ -286,13 +282,13 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy" + finalProxySend, idGenerator.getNextId(), this.getMsgtype(), - true, groupId), msgUUID, timeout, timeUnit); + true, groupId), msgUUID); } else { sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(Collections.singletonList(body), "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + finalProxySend, idGenerator.getNextId(), this.getMsgtype(), - false, groupId), msgUUID, timeout, timeUnit); + false, groupId), msgUUID); } return attemptSendMessage(sendOperation); @@ -302,8 +298,8 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long } public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap) { - return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false); + Map extraAttrMap) { + return sendMessage(body, groupId, streamId, dt, msgUUID, extraAttrMap, false); } /** @@ -314,14 +310,12 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long * @param streamId streamId * @param dt data report timestamp * @param msgUUID msg uuid - * @param timeout - * @param timeUnit * @param extraAttrMap extra attributes * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ * @return SendResult.OK means success */ public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap, boolean isProxySend) { + Map extraAttrMap, boolean isProxySend) { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { @@ -345,8 +339,7 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, - timeout, timeUnit); + Function sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID); return attemptSendMessage(sendOperation); } else if (msgtype == 3 || msgtype == 5) { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt); @@ -355,13 +348,14 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long Function sendOperation = (sender) -> sender.syncSendMessage( new EncodeObject(Collections.singletonList(body), attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), true, groupId), - msgUUID, timeout, timeUnit); + msgUUID); return attemptSendMessage(sendOperation); } else { Function sendOperation = (sender) -> sender.syncSendMessage( new EncodeObject(Collections.singletonList(body), - attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), false, groupId), - msgUUID, timeout, timeUnit); + attrs.toString(), idGenerator.getNextId(), + this.getMsgtype(), false, groupId), + msgUUID); return attemptSendMessage(sendOperation); } } @@ -369,9 +363,8 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long } - public SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit) { - return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); + public SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, String msgUUID) { + return sendMessage(bodyList, groupId, streamId, dt, msgUUID, false); } /** @@ -382,13 +375,11 @@ public SendResult sendMessage(List bodyList, String groupId, String stre * @param streamId streamId * @param dt data report timestamp * @param msgUUID msg uuid - * @param timeout - * @param timeUnit * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ * @return SendResult.OK means success */ - public SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, boolean isProxySend) { + public SendResult sendMessage(List bodyList, + String groupId, String streamId, long dt, String msgUUID, boolean isProxySend) { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) { return SendResult.INVALID_ATTRIBUTES; @@ -408,8 +399,7 @@ public SendResult sendMessage(List bodyList, String groupId, String stre isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, - timeout, timeUnit); + Function sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID); return attemptSendMessage(sendOperation); } else if (msgtype == 3 || msgtype == 5) { if (isProxySend) { @@ -422,14 +412,12 @@ public SendResult sendMessage(List bodyList, String groupId, String stre sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy" + "&cnt=" + bodyList.size() + finalProxySend, - idGenerator.getNextId(), this.getMsgtype(), - true, groupId), msgUUID, timeout, timeUnit); + idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID); } else { sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cnt=" + bodyList.size() + finalProxySend, - idGenerator.getNextId(), this.getMsgtype(), - false, groupId), msgUUID, timeout, timeUnit); + idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID); } return attemptSendMessage(sendOperation); } @@ -437,8 +425,32 @@ public SendResult sendMessage(List bodyList, String groupId, String stre } public SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, - String msgUUID, long timeout, TimeUnit timeUnit, Map extraAttrMap) { - return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false); + String msgUUID, Map extraAttrMap) { + return sendMessage(bodyList, groupId, streamId, dt, msgUUID, extraAttrMap, false); + } + + @Override + public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, + String msgUUID, Map extraAttrMap) throws ProxysdkException { + + } + + @Override + public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, + String msgUUID) throws ProxysdkException { + + } + + @Override + public void asyncSendMessage(SendMessageCallback callback, List bodyList, String groupId, String streamId, + long dt, String msgUUID) throws ProxysdkException { + + } + + @Override + public void asyncSendMessage(SendMessageCallback callback, List bodyList, String groupId, String streamId, + long dt, String msgUUID, Map extraAttrMap) throws ProxysdkException { + } /** @@ -449,14 +461,12 @@ public SendResult sendMessage(List bodyList, String groupId, String stre * @param streamId streamId * @param dt data report timestamp * @param msgUUID msg uuid - * @param timeout - * @param timeUnit * @param extraAttrMap extra attributes * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ * @return SendResult.OK means success */ public SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, - String msgUUID, long timeout, TimeUnit timeUnit, Map extraAttrMap, boolean isProxySend) { + String msgUUID, Map extraAttrMap, boolean isProxySend) { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid( extraAttrMap)) { @@ -476,8 +486,7 @@ public SendResult sendMessage(List bodyList, String groupId, String stre isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, - timeout, timeUnit); + Function sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID); return attemptSendMessage(sendOperation); } else if (msgtype == 3 || msgtype == 5) { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId) @@ -486,13 +495,12 @@ public SendResult sendMessage(List bodyList, String groupId, String stre attrs.append("&cp=snappy"); Function sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); + idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID); return attemptSendMessage(sendOperation); } else { Function sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, - timeUnit); + idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID); return attemptSendMessage(sendOperation); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java index 1b18096229..e980e65974 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; public interface MessageSender { @@ -33,8 +32,7 @@ public interface MessageSender { * @param body The data will be sent * */ - SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit); + SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID); /** * This method provides a synchronized function which you want to send data without packing @@ -45,8 +43,8 @@ SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, St * @param extraAttrMap The attributes you want to add, * and each element of extraAttrMap contains a pair like attrKey,attrValue */ - SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap); + SendResult sendMessage(byte[] body, String groupId, + String streamId, long dt, String msgUUID, Map extraAttrMap); /** * This method provides a synchronized function which you want to send data with packing @@ -54,8 +52,7 @@ SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, St * * @param bodyList The data will be sent,which is a collection consisting of byte arrays */ - SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit); + SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, String msgUUID); /** * This method provides a synchronized function which you want to send data with packing @@ -66,8 +63,8 @@ SendResult sendMessage(List bodyList, String groupId, String streamId, l * @param extraAttrMap The attributes you want to add, * and each element of extraAttrMap contains a pair like attrKey,attrValue */ - SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap); + SendResult sendMessage(List bodyList, String groupId, + String streamId, long dt, String msgUUID, Map extraAttrMap); /** * This method provides a synchronized function which you want to send data without packing @@ -80,7 +77,6 @@ SendResult sendMessage(List bodyList, String groupId, String streamId, l */ void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap) throws ProxysdkException; /** @@ -91,8 +87,7 @@ void asyncSendMessage(SendMessageCallback callback, * @param body The data will be sent */ void asyncSendMessage(SendMessageCallback callback, - byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException; + byte[] body, String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException; /** * This method provides an asynchronized function which you want to send data with packing @@ -101,8 +96,7 @@ void asyncSendMessage(SendMessageCallback callback, * @param bodyList The data will be sent,which is a collection consisting of byte arrays */ void asyncSendMessage(SendMessageCallback callback, - List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException; + List bodyList, String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException; /** * This method provides an asynchronized function which you want to send data with packing @@ -115,7 +109,6 @@ void asyncSendMessage(SendMessageCallback callback, */ void asyncSendMessage(SendMessageCallback callback, List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap) throws ProxysdkException; /** diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index c3253805cb..3338d866c2 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -59,14 +59,18 @@ public class ProxyClientConfig { private MetricConfig metricConfig = new MetricConfig(); private int managerConnectionTimeout = 10000; + // http socket timeout in milliseconds + private int managerSocketTimeout = 30 * 1000; + private boolean readProxyIPFromLocal = false; - /** - * Default connection, handshake, and initial request timeout in milliseconds. - */ - private long connectTimeoutMillis; - private long requestTimeoutMillis; - private int managerSocketTimeout = 30 * 1000; + // connect timeout in milliseconds + private long connectTimeoutMs = ConfigConstants.VAL_DEF_CONNECT_TIMEOUT_MS; + // request timeout in milliseconds + private long requestTimeoutMs = ConfigConstants.VAL_DEF_REQUEST_TIMEOUT_MS; + // connect close wait period in milliseconds + private long conCloseWaitPeriodMs = + ConfigConstants.VAL_DEF_REQUEST_TIMEOUT_MS + ConfigConstants.VAL_DEF_CONNECT_CLOSE_DELAY_MS; // configuration for http client // whether discard old metric when cache is full. @@ -117,8 +121,6 @@ public ProxyClientConfig(String localHost, boolean requestByHttp, String manager this.proxyUpdateIntervalMinutes = ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES; this.proxyHttpUpdateIntervalMinutes = ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES; this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY; - this.connectTimeoutMillis = ConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS; - this.setRequestTimeoutMillis(ConfigConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS); this.authSecretId = authSecretId; this.authSecretKey = authSecretKey; this.loadBalance = loadBalance; @@ -148,8 +150,6 @@ public ProxyClientConfig(String managerAddress, String inlongGroupId, String aut this.proxyUpdateIntervalMinutes = ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES; this.proxyHttpUpdateIntervalMinutes = ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES; this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY; - this.connectTimeoutMillis = ConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS; - this.setRequestTimeoutMillis(ConfigConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS); this.authSecretId = authSecretId; this.authSecretKey = authSecretKey; this.loadBalance = loadBalance; @@ -299,20 +299,36 @@ public void setProxyUpdateMaxRetry(int proxyUpdateMaxRetry) { this.proxyUpdateMaxRetry = proxyUpdateMaxRetry; } - public long getConnectTimeoutMillis() { - return connectTimeoutMillis; + public long getConnectTimeoutMs() { + return connectTimeoutMs; + } + + public void setConnectTimeoutMs(long connectTimeoutMs) { + if (connectTimeoutMs >= ConfigConstants.VAL_MIN_CONNECT_TIMEOUT_MS) { + this.connectTimeoutMs = connectTimeoutMs; + } } - public void setConnectTimeoutMillis(long connectTimeoutMillis) { - this.connectTimeoutMillis = connectTimeoutMillis; + public long getRequestTimeoutMs() { + return requestTimeoutMs; } - public long getRequestTimeoutMillis() { - return requestTimeoutMillis; + public void setRequestTimeoutMs(long requestTimeoutMs) { + if (requestTimeoutMs >= ConfigConstants.VAL_MIN_REQUEST_TIMEOUT_MS) { + this.requestTimeoutMs = requestTimeoutMs; + this.conCloseWaitPeriodMs = + this.requestTimeoutMs + ConfigConstants.VAL_DEF_CONNECT_CLOSE_DELAY_MS; + } } - public void setRequestTimeoutMillis(long requestTimeoutMillis) { - this.requestTimeoutMillis = requestTimeoutMillis; + public long getConCloseWaitPeriodMs() { + return conCloseWaitPeriodMs; + } + + public void setConCloseWaitPeriodMs(long conCloseWaitPeriodMs) { + if (conCloseWaitPeriodMs >= 0) { + this.conCloseWaitPeriodMs = conCloseWaitPeriodMs; + } } public String getRsaPubKeyUrl() { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java index 4c51696be0..4658bb1a05 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java @@ -26,7 +26,6 @@ import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; -import java.util.concurrent.TimeUnit; public class SendMsgThread extends Thread { @@ -49,8 +48,7 @@ public void run() { long startTime = System.currentTimeMillis(); SendResult result = messageSender.sendMessage("hhhh".getBytes("utf8"), - "b_test", "n_test1", 0, String.valueOf(System.currentTimeMillis()), 1, - TimeUnit.MILLISECONDS); + "b_test", "n_test1", 0, String.valueOf(System.currentTimeMillis())); long endTime = System.currentTimeMillis(); if (result == result.OK) { logger.info("this msg is ok time {}", endTime - startTime); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java index 55b6cf6d99..eda90bdbca 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java @@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; -import java.util.concurrent.TimeUnit; public class TcpClientExample { @@ -75,6 +74,7 @@ public DefaultMessageSender getMessageSender(String localIP, String inLongManage } dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal); dataProxyConfig.setProtocolType(ProtocolType.TCP); + dataProxyConfig.setRequestTimeoutMs(20000L); messageSender = DefaultMessageSender.generateSenderByClusterId(dataProxyConfig); messageSender.setMsgtype(msgType); } catch (Exception e) { @@ -88,7 +88,7 @@ public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId, SendResult result = null; try { result = sender.sendMessage(messageBody.getBytes("utf8"), inlongGroupId, inlongStreamId, - 0, String.valueOf(dt), 20, TimeUnit.SECONDS); + 0, String.valueOf(dt)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java index cd38e5f895..f529730384 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java @@ -89,7 +89,7 @@ public void operationComplete(ChannelFuture arg0) throws Exception { try { // Wait until the connection is built. - awaitLatch.await(configure.getConnectTimeoutMillis(), + awaitLatch.await(configure.getConnectTimeoutMs(), TimeUnit.MILLISECONDS); } catch (Exception e) { logger.error("create connect exception! {}", e.getMessage()); @@ -123,8 +123,7 @@ public void operationComplete(ChannelFuture arg0) } }); // Wait until the connection is close. - awaitLatch.await(configure.getRequestTimeoutMillis(), - TimeUnit.MILLISECONDS); + awaitLatch.await(configure.getConCloseWaitPeriodMs(), TimeUnit.MILLISECONDS); // Return if close this connection fail. if (!future.isSuccess()) { ret = false; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index 50b3105b56..01ac56a53b 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -173,8 +173,9 @@ public void notifyCallback(Channel channel, String messageId, SendResult result) currentBufferSize.decrementAndGet(); } - private SendResult syncSendInternalMessage(NettyClient client, EncodeObject encodeObject, String msgUUID, - long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + private SendResult syncSendInternalMessage(NettyClient client, + EncodeObject encodeObject, String msgUUID) + throws ExecutionException, InterruptedException, TimeoutException { if (client == null) { return SendResult.NO_CONNECTION; } @@ -204,11 +205,12 @@ private SendResult syncSendInternalMessage(NettyClient client, EncodeObject enco encodeObject.setEncryptEntry(false, null, null); } encodeObject.setMsgUUID(msgUUID); - SyncMessageCallable callable = new SyncMessageCallable(client, encodeObject, timeout, timeUnit); + SyncMessageCallable callable = new SyncMessageCallable(client, encodeObject, + configure.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); syncCallables.put(encodeObject.getMessageId(), callable); Future future = threadPool.submit(callable); - return future.get(timeout, timeUnit); + return future.get(configure.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); } /** @@ -217,11 +219,9 @@ private SendResult syncSendInternalMessage(NettyClient client, EncodeObject enco * * @param encodeObject * @param msgUUID - * @param timeout - * @param timeUnit * @return */ - public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID, long timeout, TimeUnit timeUnit) { + public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID) { if (configure.isEnableMetric()) { metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getDt(), @@ -230,7 +230,7 @@ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID, lon NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(), encodeObject); SendResult message = null; try { - message = syncSendInternalMessage(client, encodeObject, msgUUID, timeout, timeUnit); + message = syncSendInternalMessage(client, encodeObject, msgUUID); } catch (InterruptedException e) { // TODO Auto-generated catch block LOGGER.error("send message error {} ", getExceptionStack(e)); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java index f0bd45d13e..fc584f5a5a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java @@ -46,13 +46,16 @@ public class PbProtocolMessageSender implements MessageSender, Configurable { public static final Logger LOG = LoggerFactory.getLogger(PbProtocolMessageSender.class); - + private static final String KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms"; + private static final long VAL_DEF_REQUEST_TIMEOUT_MS = 20000L; + private static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L; private String name; private String localIp; private LifecycleState lifecycleState; private Context context; private BufferQueueChannel channel; private ProxySdkSink sink; + private long requestTimeoutMs = VAL_DEF_REQUEST_TIMEOUT_MS; /** * Constructor @@ -101,6 +104,10 @@ public void configure(Context context) { this.sink.setName(name + "-sink"); this.sink.configure(context); this.sink.setChannel(channel); + long tmpTimeoutMs = context.getLong(KEY_REQUEST_TIMEOUT_MS, VAL_DEF_REQUEST_TIMEOUT_MS); + if (tmpTimeoutMs >= VAL_MIN_REQUEST_TIMEOUT_MS) { + this.requestTimeoutMs = tmpTimeoutMs; + } } /** @@ -179,14 +186,11 @@ public Context getContext() { * @param streamId * @param dt * @param msgUUID - * @param timeout - * @param timeUnit * @return SendResult */ @Override - public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, long timeout, - TimeUnit timeUnit) { - return this.sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, null); + public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID) { + return this.sendMessage(body, groupId, streamId, dt, msgUUID, null); } /** @@ -197,14 +201,12 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long * @param streamId * @param dt * @param msgUUID - * @param timeout - * @param timeUnit * @param extraAttrMap * @return SendResult */ @Override - public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, long timeout, - TimeUnit timeUnit, Map extraAttrMap) { + public SendResult sendMessage(byte[] body, String groupId, + String streamId, long dt, String msgUUID, Map extraAttrMap) { // prepare SdkEvent sdkEvent = new SdkEvent(); sdkEvent.setInlongGroupId(groupId); @@ -237,7 +239,7 @@ public void onException(Throwable e) { this.put(profile); // wait try { - boolean success = latch.await(timeout, timeUnit); + boolean success = latch.await(requestTimeoutMs, TimeUnit.MILLISECONDS); if (!success) { refResult.set(SendResult.TIMEOUT); } @@ -256,14 +258,11 @@ public void onException(Throwable e) { * @param streamId * @param dt * @param msgUUID - * @param timeout - * @param timeUnit * @return SendResult */ @Override - public SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit) { - return this.sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, null); + public SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, String msgUUID) { + return this.sendMessage(bodyList, groupId, streamId, dt, msgUUID, null); } /** @@ -274,14 +273,12 @@ public SendResult sendMessage(List bodyList, String groupId, String stre * @param streamId * @param dt * @param msgUUID - * @param timeout - * @param timeUnit * @param extraAttrMap * @return SendResult */ @Override - public SendResult sendMessage(List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap) { + public SendResult sendMessage(List bodyList, + String groupId, String streamId, long dt, String msgUUID, Map extraAttrMap) { final AtomicReference refResult = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(bodyList.size()); // prepare @@ -318,7 +315,7 @@ public void onException(Throwable e) { this.putAll(events); // wait try { - boolean success = latch.await(timeout, timeUnit); + boolean success = latch.await(requestTimeoutMs, TimeUnit.MILLISECONDS); if (!success) { refResult.set(SendResult.TIMEOUT); } @@ -338,14 +335,12 @@ public void onException(Throwable e) { * @param streamId * @param dt * @param msgUUID - * @param timeout - * @param timeUnit * @param extraAttrMap * @throws ProxysdkException */ @Override public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, - String msgUUID, long timeout, TimeUnit timeUnit, Map extraAttrMap) + String msgUUID, Map extraAttrMap) throws ProxysdkException { SdkEvent sdkEvent = new SdkEvent(); sdkEvent.setInlongGroupId(groupId); @@ -369,14 +364,12 @@ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String g * @param streamId * @param dt * @param msgUUID - * @param timeout - * @param timeUnit * @throws ProxysdkException */ @Override - public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, - String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { - this.asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, null); + public void asyncSendMessage(SendMessageCallback callback, byte[] body, + String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException { + this.asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, null); } /** @@ -388,14 +381,12 @@ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String g * @param streamId * @param dt * @param msgUUID - * @param timeout - * @param timeUnit * @throws ProxysdkException */ @Override - public void asyncSendMessage(SendMessageCallback callback, List bodyList, String groupId, String streamId, - long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { - this.asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, null); + public void asyncSendMessage(SendMessageCallback callback, List bodyList, + String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException { + this.asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, null); } /** @@ -407,15 +398,13 @@ public void asyncSendMessage(SendMessageCallback callback, List bodyList * @param streamId * @param dt * @param msgUUID - * @param timeout - * @param timeUnit * @param extraAttrMap * @throws ProxysdkException */ @Override - public void asyncSendMessage(SendMessageCallback callback, List bodyList, String groupId, String streamId, - long dt, String msgUUID, long timeout, TimeUnit timeUnit, Map extraAttrMap) - throws ProxysdkException { + public void asyncSendMessage(SendMessageCallback callback, + List bodyList, String groupId, String streamId, long dt, String msgUUID, + Map extraAttrMap) throws ProxysdkException { List events = new ArrayList<>(bodyList.size()); for (byte[] body : bodyList) { SdkEvent sdkEvent = new SdkEvent(); @@ -445,8 +434,7 @@ public void asyncSendMessage(SendMessageCallback callback, List bodyList @Override public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback) throws ProxysdkException { - this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(), null, 0L, null, - null); + this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(), null, null); } /** @@ -461,8 +449,8 @@ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] @Override public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List bodyList, SendMessageCallback callback) throws ProxysdkException { - this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), null, 0L, - null, null); + this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), null, + null); } }