Skip to content

Commit

Permalink
[INLONG-11463][SDK] Remove deprecated APIs in the DefaultMessageSende…
Browse files Browse the repository at this point in the history
…r class (#11464)

Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Nov 6, 2024
1 parent 58fe6ee commit a658a09
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 478 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ public class CommonConstants {
public static final String PROXY_SENDER_MAX_RETRY = "proxy.sender.maxRetry";
public static final int DEFAULT_PROXY_SENDER_MAX_RETRY = 5;

public static final String PROXY_IS_FILE = "proxy.isFile";
public static final boolean DEFAULT_IS_FILE = false;

public static final String PROXY_CLIENT_IO_THREAD_NUM = "client.iothread.num";
public static final int DEFAULT_PROXY_CLIENT_IO_THREAD_NUM =
Runtime.getRuntime().availableProcessors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public class SenderManager {
private final int aliveConnectionNum;
private final boolean isCompress;
private final int msgType;
private final boolean isFile;
private final long maxSenderTimeout;
private final int maxSenderRetry;
private final long retrySleepTime;
Expand Down Expand Up @@ -133,7 +132,6 @@ public SenderManager(InstanceProfile profile, String inlongGroupId, String sourc
CommonConstants.PROXY_SENDER_MAX_RETRY, CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
retrySleepTime = agentConf.getLong(
CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE);
ioThreadNum = profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
Expand Down Expand Up @@ -200,7 +198,6 @@ private void createMessageSender() throws Exception {
ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);

proxyClientConfig.setIoThreadNum(ioThreadNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.util.MessageUtils;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.FileCallback;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
Expand Down Expand Up @@ -833,122 +832,6 @@ private void addIndexCnt(String groupId, String streamId, long cnt) {
}
}

@Deprecated
public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId, String streamId,
long dt, int sid, boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());

StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);

if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
isCompress, isReport, isGroupIdTransfer,
dt / 1000, sid, groupId, streamId, attrs.toString(), "data", "");
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessageIndex(encodeObject, callback, msgUUID, timeout, timeUnit);
}
}

@Deprecated
private void asyncSendMetric(FileCallback callback, byte[] body, String groupId, String streamId, long dt, int sid,
String ip, String msgUUID, long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
boolean isCompressEnd = false;
if (msgtype == 7 || msgtype == 8) {
sender.asyncSendMessageIndex(new EncodeObject(body, msgtype, isCompressEnd,
isReport, isGroupIdTransfer, dt / 1000,
sid, groupId, streamId, "", messageKey, ip), callback, msgUUID, timeout, timeUnit);
}
}

@Deprecated
public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
int sid, String ip, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout,
timeUnit, "minute");
}

@Deprecated
public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit,
"file");
}

@Deprecated
public String sendMessageData(List<byte[]> bodyList, String groupId, String streamId, long dt, int sid,
boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES.toString();
}
if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
return SendResult.BODY_EXCEED_MAX_LEN.toString();
}
addIndexCnt(groupId, streamId, bodyList.size());

StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);

if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress,
isReport, isGroupIdTransfer, dt / 1000,
sid, groupId, streamId, attrs.toString(), "data", "");
encodeObject.setSupportLF(isSupportLF);
Function<Sender, String> sendOperation = (sender) -> sender.syncSendMessageIndex(encodeObject, msgUUID,
timeout, timeUnit);
return attemptSendMessageIndex(sendOperation);
}
return null;
}

@Deprecated
private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID,
long timeout, TimeUnit timeUnit, String messageKey) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES.toString();
}
if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
return SendResult.BODY_EXCEED_MAX_LEN.toString();
}
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, false, isReport,
isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", messageKey, ip);
Function<Sender, String> sendOperation = (sender) -> sender.syncSendMessageIndex(encodeObject, msgUUID,
timeout, timeUnit);
return attemptSendMessageIndex(sendOperation);
}
return null;
}

@Deprecated
public String sendMessageProxy(byte[] body, String groupId, String streamId, long dt, int sid, String ip,
String msgUUID, long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
}

@Deprecated
public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID,
long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
}

private void shutdownInternalThreads() {
indexCol.shutDown();
MANAGER_FETCHER_THREAD_STARTED.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class ProxyClientConfig {
private int proxyUpdateIntervalMinutes;
private int proxyUpdateMaxRetry;
private String inlongGroupId;
private boolean isFile = false;
private boolean requestByHttp = true;
private boolean isNeedDataEncry = false;
private boolean needAuthentication = false;
Expand Down Expand Up @@ -196,14 +195,6 @@ public boolean isRequestByHttp() {
return requestByHttp;
}

public boolean isFile() {
return isFile;
}

public void setFile(boolean file) {
isFile = file;
}

public String getInlongGroupId() {
return inlongGroupId;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
public interface SendMessageCallback {

/* Invoked when a message is confirmed by TDBus. */
public void onMessageAck(SendResult result);
void onMessageAck(SendResult result);

/* Invoked when a message transportation interrupted by an exception. */
public void onException(Throwable e);
void onException(Throwable e);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package org.apache.inlong.sdk.dataproxy.example;

import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.common.FileCallback;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyMessageCallBack extends FileCallback {
public class MyMessageCallBack implements SendMessageCallback {

private static final Logger logger = LoggerFactory
.getLogger(MyMessageCallBack.class);

private DefaultMessageSender messageSender = null;
private Event event = null;

Expand All @@ -41,10 +42,7 @@ public MyMessageCallBack(DefaultMessageSender messageSender, Event event) {
this.event = event;
}

public void onMessageAck(String result) {
logger.info("onMessageAck return result = {}", result);
}

@Override
public void onMessageAck(SendResult result) {
if (result == SendResult.OK) {
logger.info("onMessageAck return Ok");
Expand All @@ -53,6 +51,7 @@ public void onMessageAck(SendResult result) {
}
}

@Override
public void onException(Throwable e) {
logger.error("Send message failure, error {}", e.getMessage());
e.printStackTrace();
Expand Down
Loading

0 comments on commit a658a09

Please sign in to comment.