Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11475][SDK] Remove the timeout parameter in the MessageSender class functions #11476

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
Expand Down Expand Up @@ -168,7 +167,7 @@ private void doSendStatusMsg(DefaultMessageSender sender) {
INLONG_AGENT_SYSTEM,
INLONG_AGENT_STATUS,
AgentUtils.getCurrentTime(),
"", 30, TimeUnit.SECONDS);
"");
if (ret != SendResult.OK) {
LOGGER.error("send status failed: ret {}", ret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
Expand Down Expand Up @@ -135,7 +134,7 @@ private void doSendStaticMsg(DefaultMessageSender sender) {
INLONG_AGENT_SYSTEM,
INLONG_FILE_STATIC,
AgentUtils.getCurrentTime(),
"", 30, TimeUnit.SECONDS);
"");
if (ret != SendResult.OK) {
LOGGER.error("send static failed: ret {}", ret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ private void createMessageSender() {
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
proxyClientConfig.setRequestTimeoutMs(30000L);
ThreadFactory SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-heartbeat",
Thread.currentThread().isDaemon());
sender = new DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.inlong.sdk.dataproxy;

import java.util.concurrent.TimeUnit;

public class ConfigConstants {

public static final String PROXY_SDK_VERSION = "1.2.11";
Expand Down Expand Up @@ -49,12 +47,13 @@ public class ConfigConstants {

public static final int MAX_LINE_CNT = 30;

/* Default connection,connect timeout in milliseconds. */
public static final long DEFAULT_CONNECT_TIMEOUT_MILLIS =
TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS);

public static final long DEFAULT_REQUEST_TIMEOUT_MILLIS =
TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS);
// connection timeout in milliseconds
public static final long VAL_DEF_CONNECT_TIMEOUT_MS = 20000L;
public static final long VAL_MIN_CONNECT_TIMEOUT_MS = 1L;
public static final long VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500L;
// request timeout in milliseconds
public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L;
public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L;

public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,8 @@ private String attemptSendMessageIndex(Function<Sender, String> sendOperation) {
return sendIndexResult;
}

public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) {
return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID) {
return sendMessage(body, groupId, streamId, dt, msgUUID, false);
}

/**
Expand All @@ -243,13 +242,11 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param timeout
* @param timeUnit
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, boolean isProxySend) {
public SendResult sendMessage(byte[] body, String groupId,
String streamId, long dt, String msgUUID, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
Expand All @@ -271,8 +268,7 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
new EncodeObject(Collections.singletonList(body), msgtype, isCompressEnd, isReport,
isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
Expand All @@ -286,13 +282,13 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy"
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
true, groupId), msgUUID, timeout, timeUnit);
true, groupId), msgUUID);
} else {
sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(Collections.singletonList(body),
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
false, groupId), msgUUID, timeout, timeUnit);
false, groupId), msgUUID);

}
return attemptSendMessage(sendOperation);
Expand All @@ -302,8 +298,8 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
}

public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
Map<String, String> extraAttrMap) {
return sendMessage(body, groupId, streamId, dt, msgUUID, extraAttrMap, false);
}

/**
Expand All @@ -314,14 +310,12 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param timeout
* @param timeUnit
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
Map<String, String> extraAttrMap, boolean isProxySend) {

dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
Expand All @@ -345,8 +339,7 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
Expand All @@ -355,23 +348,23 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(
new EncodeObject(Collections.singletonList(body),
attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
msgUUID);
return attemptSendMessage(sendOperation);
} else {
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(
new EncodeObject(Collections.singletonList(body),
attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), false, groupId),
msgUUID, timeout, timeUnit);
attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
msgUUID);
return attemptSendMessage(sendOperation);
}
}
return null;

}

public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit) {
return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID) {
return sendMessage(bodyList, groupId, streamId, dt, msgUUID, false);
}

/**
Expand All @@ -382,13 +375,11 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param timeout
* @param timeUnit
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
long timeout, TimeUnit timeUnit, boolean isProxySend) {
public SendResult sendMessage(List<byte[]> bodyList,
String groupId, String streamId, long dt, String msgUUID, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
Expand All @@ -408,8 +399,7 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
Expand All @@ -422,23 +412,45 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy" + "&cnt="
+ bodyList.size() + finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
true, groupId), msgUUID, timeout, timeUnit);
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID);
} else {
sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cnt=" + bodyList.size()
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
false, groupId), msgUUID, timeout, timeUnit);
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID);
}
return attemptSendMessage(sendOperation);
}
return null;
}

public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
String msgUUID, Map<String, String> extraAttrMap) {
return sendMessage(bodyList, groupId, streamId, dt, msgUUID, extraAttrMap, false);
}

@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
String msgUUID, Map<String, String> extraAttrMap) throws ProxysdkException {

}

@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
String msgUUID) throws ProxysdkException {

}

@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId,
long dt, String msgUUID) throws ProxysdkException {

}

@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId,
long dt, String msgUUID, Map<String, String> extraAttrMap) throws ProxysdkException {

}

/**
Expand All @@ -449,14 +461,12 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param timeout
* @param timeUnit
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
String msgUUID, Map<String, String> extraAttrMap, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(
extraAttrMap)) {
Expand All @@ -476,8 +486,7 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID,
timeout, timeUnit);
Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
Expand All @@ -486,13 +495,12 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
attrs.append("&cp=snappy");
Function<Sender, SendResult> sendOperation =
(sender) -> sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID);
return attemptSendMessage(sendOperation);
} else {
Function<Sender, SendResult> sendOperation =
(sender) -> sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout,
timeUnit);
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID);
return attemptSendMessage(sendOperation);
}
}
Expand Down
Loading
Loading