Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11463][SDK] Remove deprecated APIs in the DefaultMessageSender class #11464

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ public class CommonConstants {
public static final String PROXY_SENDER_MAX_RETRY = "proxy.sender.maxRetry";
public static final int DEFAULT_PROXY_SENDER_MAX_RETRY = 5;

public static final String PROXY_IS_FILE = "proxy.isFile";
public static final boolean DEFAULT_IS_FILE = false;

public static final String PROXY_CLIENT_IO_THREAD_NUM = "client.iothread.num";
public static final int DEFAULT_PROXY_CLIENT_IO_THREAD_NUM =
Runtime.getRuntime().availableProcessors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public class SenderManager {
private final int aliveConnectionNum;
private final boolean isCompress;
private final int msgType;
private final boolean isFile;
private final long maxSenderTimeout;
private final int maxSenderRetry;
private final long retrySleepTime;
Expand Down Expand Up @@ -133,7 +132,6 @@ public SenderManager(InstanceProfile profile, String inlongGroupId, String sourc
CommonConstants.PROXY_SENDER_MAX_RETRY, CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
retrySleepTime = agentConf.getLong(
CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE);
ioThreadNum = profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
Expand Down Expand Up @@ -200,7 +198,6 @@ private void createMessageSender() throws Exception {
ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);

proxyClientConfig.setIoThreadNum(ioThreadNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.util.MessageUtils;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.FileCallback;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
Expand Down Expand Up @@ -833,122 +832,6 @@ private void addIndexCnt(String groupId, String streamId, long cnt) {
}
}

@Deprecated
public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId, String streamId,
long dt, int sid, boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());

StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);

if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
isCompress, isReport, isGroupIdTransfer,
dt / 1000, sid, groupId, streamId, attrs.toString(), "data", "");
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessageIndex(encodeObject, callback, msgUUID, timeout, timeUnit);
}
}

@Deprecated
private void asyncSendMetric(FileCallback callback, byte[] body, String groupId, String streamId, long dt, int sid,
String ip, String msgUUID, long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
boolean isCompressEnd = false;
if (msgtype == 7 || msgtype == 8) {
sender.asyncSendMessageIndex(new EncodeObject(body, msgtype, isCompressEnd,
isReport, isGroupIdTransfer, dt / 1000,
sid, groupId, streamId, "", messageKey, ip), callback, msgUUID, timeout, timeUnit);
}
}

@Deprecated
public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
int sid, String ip, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout,
timeUnit, "minute");
}

@Deprecated
public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit,
"file");
}

@Deprecated
public String sendMessageData(List<byte[]> bodyList, String groupId, String streamId, long dt, int sid,
boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES.toString();
}
if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
return SendResult.BODY_EXCEED_MAX_LEN.toString();
}
addIndexCnt(groupId, streamId, bodyList.size());

StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);

if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress,
isReport, isGroupIdTransfer, dt / 1000,
sid, groupId, streamId, attrs.toString(), "data", "");
encodeObject.setSupportLF(isSupportLF);
Function<Sender, String> sendOperation = (sender) -> sender.syncSendMessageIndex(encodeObject, msgUUID,
timeout, timeUnit);
return attemptSendMessageIndex(sendOperation);
}
return null;
}

@Deprecated
private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID,
long timeout, TimeUnit timeUnit, String messageKey) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES.toString();
}
if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
return SendResult.BODY_EXCEED_MAX_LEN.toString();
}
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, false, isReport,
isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", messageKey, ip);
Function<Sender, String> sendOperation = (sender) -> sender.syncSendMessageIndex(encodeObject, msgUUID,
timeout, timeUnit);
return attemptSendMessageIndex(sendOperation);
}
return null;
}

@Deprecated
public String sendMessageProxy(byte[] body, String groupId, String streamId, long dt, int sid, String ip,
String msgUUID, long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
}

@Deprecated
public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID,
long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
}

private void shutdownInternalThreads() {
indexCol.shutDown();
MANAGER_FETCHER_THREAD_STARTED.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class ProxyClientConfig {
private int proxyUpdateIntervalMinutes;
private int proxyUpdateMaxRetry;
private String inlongGroupId;
private boolean isFile = false;
private boolean requestByHttp = true;
private boolean isNeedDataEncry = false;
private boolean needAuthentication = false;
Expand Down Expand Up @@ -196,14 +195,6 @@ public boolean isRequestByHttp() {
return requestByHttp;
}

public boolean isFile() {
return isFile;
}

public void setFile(boolean file) {
isFile = file;
}

public String getInlongGroupId() {
return inlongGroupId;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
public interface SendMessageCallback {

/* Invoked when a message is confirmed by TDBus. */
public void onMessageAck(SendResult result);
void onMessageAck(SendResult result);

/* Invoked when a message transportation interrupted by an exception. */
public void onException(Throwable e);
void onException(Throwable e);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package org.apache.inlong.sdk.dataproxy.example;

import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.common.FileCallback;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyMessageCallBack extends FileCallback {
public class MyMessageCallBack implements SendMessageCallback {

private static final Logger logger = LoggerFactory
.getLogger(MyMessageCallBack.class);

private DefaultMessageSender messageSender = null;
private Event event = null;

Expand All @@ -41,10 +42,7 @@ public MyMessageCallBack(DefaultMessageSender messageSender, Event event) {
this.event = event;
}

public void onMessageAck(String result) {
logger.info("onMessageAck return result = {}", result);
}

@Override
public void onMessageAck(SendResult result) {
if (result == SendResult.OK) {
logger.info("onMessageAck return Ok");
Expand All @@ -53,6 +51,7 @@ public void onMessageAck(SendResult result) {
}
}

@Override
public void onException(Throwable e) {
logger.error("Send message failure, error {}", e.getMessage());
e.printStackTrace();
Expand Down
Loading
Loading