Skip to content

Commit

Permalink
Merge branch 'master' into INLONG-11401
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang authored Nov 20, 2024
2 parents 1f0c99e + 81fb821 commit e5ded2e
Show file tree
Hide file tree
Showing 81 changed files with 1,332 additions and 643 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;

/**
* job profile which contains details describing properties of one job.
Expand Down Expand Up @@ -200,6 +200,6 @@ public int compareTo(InstanceProfile object) {
}

public boolean isRetry() {
return getBoolean(TASK_RETRY, false);
return getBoolean(FILE_TASK_RETRY, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;

/**
Expand Down Expand Up @@ -82,7 +82,7 @@ public void setState(TaskStateEnum state) {
}

public boolean isRetry() {
return getBoolean(TASK_RETRY, false);
return getBoolean(FILE_TASK_RETRY, false);
}

public String getTaskClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class AgentConstants {

public static final String AUDIT_ENABLE = "audit.enable";
public static final boolean DEFAULT_AUDIT_ENABLE = true;
public static final String AUDIT_PROXY_ADDRESS = "audit.proxys";

public static final String AGENT_HISTORY_PATH = "agent.history.path";
public static final String DEFAULT_AGENT_HISTORY_PATH = ".history";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public class TaskConstants extends CommonConstants {
public static final String SOURCE_DATA_CONTENT_STYLE = "task.fileTask.dataContentStyle";
public static final String SOURCE_DATA_SEPARATOR = "task.fileTask.dataSeparator";
public static final String SOURCE_FILTER_STREAMS = "task.fileTask.filterStreams";
public static final String TASK_RETRY = "task.fileTask.retry";
public static final String TASK_START_TIME = "task.fileTask.startTime";
public static final String TASK_END_TIME = "task.fileTask.endTime";
public static final String FILE_TASK_RETRY = "task.fileTask.retry";
public static final String FILE_TASK_TIME_FROM = "task.fileTask.dataTimeFrom";
public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
public static final String PREDEFINE_FIELDS = "task.predefinedFields";
public static final String TASK_AUDIT_VERSION = "task.auditVersion";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.entity.AuditComponent;

import java.util.HashSet;

import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE;
import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_PROXY_ADDRESS;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
Expand Down Expand Up @@ -67,8 +70,14 @@ public class AuditUtils {
public static void initAudit(AbstractConfiguration conf) {
IS_AUDIT = conf.getBoolean(AUDIT_ENABLE, DEFAULT_AUDIT_ENABLE);
if (IS_AUDIT) {
AuditOperator.getInstance().setAuditProxy(AuditComponent.AGENT, conf.get(AGENT_MANAGER_ADDR),
conf.get(AGENT_MANAGER_AUTH_SECRET_ID), conf.get(AGENT_MANAGER_AUTH_SECRET_KEY));
if (conf.hasKey(AUDIT_PROXY_ADDRESS)) {
HashSet<String> address = new HashSet<>();
address.add(conf.get(AUDIT_PROXY_ADDRESS));
AuditOperator.getInstance().setAuditProxy(address);
} else {
AuditOperator.getInstance().setAuditProxy(AuditComponent.AGENT, conf.get(AGENT_MANAGER_ADDR),
conf.get(AGENT_MANAGER_AUTH_SECRET_ID), conf.get(AGENT_MANAGER_AUTH_SECRET_KEY));
}
AuditOperator.getInstance().setLocalIP(conf.get(AgentConstants.AGENT_LOCAL_IP));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class FileTask {
private Integer id;
private String cycleUnit;
private Boolean retry;
private Long startTime;
private Long endTime;
private String dataTimeFrom;
private String dataTimeTo;
private String timeOffset;
private String timeZone;
private String addictiveString;
Expand Down Expand Up @@ -91,9 +91,9 @@ public static class FileTaskConfig {

private Boolean retry;

private Long startTime;
private String dataTimeFrom;

private Long endTime;
private String dataTimeTo;

private String pattern;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ private static FileTask getFileTask(DataConfig dataConfig) {
fileTask.setMaxFileCount(taskConfig.getMaxFileCount());
fileTask.setRetry(taskConfig.getRetry());
fileTask.setCycleUnit(taskConfig.getCycleUnit());
fileTask.setStartTime(taskConfig.getStartTime());
fileTask.setEndTime(taskConfig.getEndTime());
fileTask.setDataTimeFrom(taskConfig.getDataTimeFrom());
fileTask.setDataTimeTo(taskConfig.getDataTimeTo());
if (taskConfig.getFilterStreams() != null) {
fileTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ public void teardownAgentHome() {
}
}

public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, String startTime, String endTime,
TaskStateEnum state, String timeZone) {
DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, timeZone);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}

private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
private DataConfig getDataConfig(int taskId, String pattern, boolean retry, String startTime, String endTime,
TaskStateEnum state, String timeZone) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
Expand All @@ -98,8 +98,8 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("h");
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
fileTaskConfig.setDataTimeFrom(startTime);
fileTaskConfig.setDataTimeTo(endTime);
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
Expand Down Expand Up @@ -225,26 +222,16 @@ private Runnable configFetchThread() {

private TaskResult getTestConfig(String testDir, int normalTaskId, int retryTaskId, int state) {
List<DataConfig> configs = new ArrayList<>();
String startStr = "2023-07-10 00:00:00";
String endStr = "2023-07-22 00:00:00";
Long start = 0L;
Long end = 0L;
String normalPattern = testDir + "YYYY/YYYYMMDDhhmm_2.log_[0-9]+";
String retryPattern = testDir + "YYYY/YYYYMMDD_1.log_[0-9]+";
try {
Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startStr);
start = parse.getTime();
parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr);
end = parse.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
configs.add(getTestDataConfig(normalTaskId, normalPattern, false, start, end, CycleUnitType.MINUTE, state));
configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, end, CycleUnitType.DAY, state));
configs.add(getTestDataConfig(normalTaskId, normalPattern, false, "202307100000", "202307220000",
CycleUnitType.MINUTE, state));
configs.add(
getTestDataConfig(retryTaskId, retryPattern, true, "20230710", "20230722", CycleUnitType.DAY, state));
return TaskResult.builder().dataConfigs(configs).build();
}

private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry, String startTime, String endTime,
String cycleUnit, int state) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("devcloud_group_id");
Expand All @@ -260,8 +247,8 @@ private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry,
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit(cycleUnit);
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
fileTaskConfig.setDataTimeFrom(startTime);
fileTaskConfig.setDataTimeTo(endTime);
fileTaskConfig.setDataContentStyle("CSV");
fileTaskConfig.setDataSeparator("|");
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.plugin.task.file.FileDataUtils;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;

import org.slf4j.Logger;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.utils.file;
package org.apache.inlong.agent.plugin.task.file;

import java.io.IOException;
import java.nio.file.Files;
Expand Down
Loading

0 comments on commit e5ded2e

Please sign in to comment.