Skip to content

Commit

Permalink
[INLONG-11475][SDK] Remove the timeout parameter in the MessageSender…
Browse files Browse the repository at this point in the history
… class functions (#11476)

Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Nov 11, 2024
1 parent 99dec05 commit 6915519
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
Expand Down Expand Up @@ -168,7 +167,7 @@ private void doSendStatusMsg(DefaultMessageSender sender) {
INLONG_AGENT_SYSTEM,
INLONG_AGENT_STATUS,
AgentUtils.getCurrentTime(),
"", 30, TimeUnit.SECONDS);
"");
if (ret != SendResult.OK) {
LOGGER.error("send status failed: ret {}", ret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
Expand Down Expand Up @@ -135,7 +134,7 @@ private void doSendStaticMsg(DefaultMessageSender sender) {
INLONG_AGENT_SYSTEM,
INLONG_FILE_STATIC,
AgentUtils.getCurrentTime(),
"", 30, TimeUnit.SECONDS);
"");
if (ret != SendResult.OK) {
LOGGER.error("send static failed: ret {}", ret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ private void createMessageSender() {
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
proxyClientConfig.setRequestTimeoutMs(30000L);
ThreadFactory SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-heartbeat",
Thread.currentThread().isDaemon());
sender = new DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.inlong.sdk.dataproxy;

import java.util.concurrent.TimeUnit;

public class ConfigConstants {

public static final String PROXY_SDK_VERSION = "1.2.11";
Expand Down Expand Up @@ -49,12 +47,13 @@ public class ConfigConstants {

public static final int MAX_LINE_CNT = 30;

/* Default connection,connect timeout in milliseconds. */
public static final long DEFAULT_CONNECT_TIMEOUT_MILLIS =
TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS);

public static final long DEFAULT_REQUEST_TIMEOUT_MILLIS =
TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS);
// connection timeout in milliseconds
public static final long VAL_DEF_CONNECT_TIMEOUT_MS = 20000L;
public static final long VAL_MIN_CONNECT_TIMEOUT_MS = 1L;
public static final long VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500L;
// request timeout in milliseconds
public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L;
public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L;

public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,8 @@ private String attemptSendMessageIndex(Function<Sender, String> sendOperation) {
return sendIndexResult;
}

public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) {
return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID) {
return sendMessage(body, groupId, streamId, dt, msgUUID, false);
}

/**
Expand All @@ -243,13 +242,11 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param timeout
* @param timeUnit
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, boolean isProxySend) {
public SendResult sendMessage(byte[] body, String groupId,
String streamId, long dt, String msgUUID, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
Expand All @@ -271,8 +268,7 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
new EncodeObject(Collections.singletonList(body), msgtype, isCompressEnd, isReport,
isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
Expand All @@ -286,13 +282,13 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy"
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
true, groupId), msgUUID, timeout, timeUnit);
true, groupId), msgUUID);
} else {
sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(Collections.singletonList(body),
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
false, groupId), msgUUID, timeout, timeUnit);
false, groupId), msgUUID);

}
return attemptSendMessage(sendOperation);
Expand All @@ -302,8 +298,8 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
}

public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
Map<String, String> extraAttrMap) {
return sendMessage(body, groupId, streamId, dt, msgUUID, extraAttrMap, false);
}

/**
Expand All @@ -314,14 +310,12 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param timeout
* @param timeUnit
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
Map<String, String> extraAttrMap, boolean isProxySend) {

dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
Expand All @@ -345,8 +339,7 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
Expand All @@ -355,23 +348,23 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(
new EncodeObject(Collections.singletonList(body),
attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
msgUUID);
return attemptSendMessage(sendOperation);
} else {
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(
new EncodeObject(Collections.singletonList(body),
attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), false, groupId),
msgUUID, timeout, timeUnit);
attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
msgUUID);
return attemptSendMessage(sendOperation);
}
}
return null;

}

public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) {
return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID) {
return sendMessage(bodyList, groupId, streamId, dt, msgUUID, false);
}

/**
Expand All @@ -382,13 +375,11 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param timeout
* @param timeUnit
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, boolean isProxySend) {
public SendResult sendMessage(List<byte[]> bodyList,
String groupId, String streamId, long dt, String msgUUID, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
Expand All @@ -408,8 +399,7 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
Expand All @@ -422,23 +412,45 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy" + "&cnt="
+ bodyList.size() + finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
true, groupId), msgUUID, timeout, timeUnit);
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID);
} else {
sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cnt=" + bodyList.size()
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
false, groupId), msgUUID, timeout, timeUnit);
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID);
}
return attemptSendMessage(sendOperation);
}
return null;
}

public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
String msgUUID, Map<String, String> extraAttrMap) {
return sendMessage(bodyList, groupId, streamId, dt, msgUUID, extraAttrMap, false);
}

@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
String msgUUID, Map<String, String> extraAttrMap) throws ProxysdkException {

}

@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
String msgUUID) throws ProxysdkException {

}

@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId,
long dt, String msgUUID) throws ProxysdkException {

}

@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId,
long dt, String msgUUID, Map<String, String> extraAttrMap) throws ProxysdkException {

}

/**
Expand All @@ -449,14 +461,12 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param timeout
* @param timeUnit
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
String msgUUID, Map<String, String> extraAttrMap, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(
extraAttrMap)) {
Expand All @@ -476,8 +486,7 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
Expand All @@ -486,13 +495,12 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
attrs.append("&cp=snappy");
Function<Sender, SendResult> sendOperation =
(sender) -> sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID);
return attemptSendMessage(sendOperation);
} else {
Function<Sender, SendResult> sendOperation =
(sender) -> sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout,
timeUnit);
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID);
return attemptSendMessage(sendOperation);
}
}
Expand Down
Loading

0 comments on commit 6915519

Please sign in to comment.