Skip to content

Commit

Permalink
[INLONG-11506][Agent] Task start and end time using string type (#11507)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Nov 19, 2024
1 parent 91a0cb3 commit 81fb821
Show file tree
Hide file tree
Showing 34 changed files with 266 additions and 530 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;

/**
* job profile which contains details describing properties of one job.
Expand Down Expand Up @@ -200,6 +200,6 @@ public int compareTo(InstanceProfile object) {
}

public boolean isRetry() {
return getBoolean(TASK_RETRY, false);
return getBoolean(FILE_TASK_RETRY, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;

/**
Expand Down Expand Up @@ -82,7 +82,7 @@ public void setState(TaskStateEnum state) {
}

public boolean isRetry() {
return getBoolean(TASK_RETRY, false);
return getBoolean(FILE_TASK_RETRY, false);
}

public String getTaskClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public class TaskConstants extends CommonConstants {
public static final String SOURCE_DATA_CONTENT_STYLE = "task.fileTask.dataContentStyle";
public static final String SOURCE_DATA_SEPARATOR = "task.fileTask.dataSeparator";
public static final String SOURCE_FILTER_STREAMS = "task.fileTask.filterStreams";
public static final String TASK_RETRY = "task.fileTask.retry";
public static final String TASK_START_TIME = "task.fileTask.startTime";
public static final String TASK_END_TIME = "task.fileTask.endTime";
public static final String FILE_TASK_RETRY = "task.fileTask.retry";
public static final String FILE_TASK_TIME_FROM = "task.fileTask.dataTimeFrom";
public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
public static final String PREDEFINE_FIELDS = "task.predefinedFields";
public static final String TASK_AUDIT_VERSION = "task.auditVersion";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class FileTask {
private Integer id;
private String cycleUnit;
private Boolean retry;
private Long startTime;
private Long endTime;
private String dataTimeFrom;
private String dataTimeTo;
private String timeOffset;
private String timeZone;
private String addictiveString;
Expand Down Expand Up @@ -91,9 +91,9 @@ public static class FileTaskConfig {

private Boolean retry;

private Long startTime;
private String dataTimeFrom;

private Long endTime;
private String dataTimeTo;

private String pattern;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ private static FileTask getFileTask(DataConfig dataConfig) {
fileTask.setMaxFileCount(taskConfig.getMaxFileCount());
fileTask.setRetry(taskConfig.getRetry());
fileTask.setCycleUnit(taskConfig.getCycleUnit());
fileTask.setStartTime(taskConfig.getStartTime());
fileTask.setEndTime(taskConfig.getEndTime());
fileTask.setDataTimeFrom(taskConfig.getDataTimeFrom());
fileTask.setDataTimeTo(taskConfig.getDataTimeTo());
if (taskConfig.getFilterStreams() != null) {
fileTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ public void teardownAgentHome() {
}
}

public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, String startTime, String endTime,
TaskStateEnum state, String timeZone) {
DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, timeZone);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}

private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
private DataConfig getDataConfig(int taskId, String pattern, boolean retry, String startTime, String endTime,
TaskStateEnum state, String timeZone) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
Expand All @@ -98,8 +98,8 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("h");
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
fileTaskConfig.setDataTimeFrom(startTime);
fileTaskConfig.setDataTimeTo(endTime);
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
Expand Down Expand Up @@ -225,26 +222,16 @@ private Runnable configFetchThread() {

private TaskResult getTestConfig(String testDir, int normalTaskId, int retryTaskId, int state) {
List<DataConfig> configs = new ArrayList<>();
String startStr = "2023-07-10 00:00:00";
String endStr = "2023-07-22 00:00:00";
Long start = 0L;
Long end = 0L;
String normalPattern = testDir + "YYYY/YYYYMMDDhhmm_2.log_[0-9]+";
String retryPattern = testDir + "YYYY/YYYYMMDD_1.log_[0-9]+";
try {
Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startStr);
start = parse.getTime();
parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr);
end = parse.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
configs.add(getTestDataConfig(normalTaskId, normalPattern, false, start, end, CycleUnitType.MINUTE, state));
configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, end, CycleUnitType.DAY, state));
configs.add(getTestDataConfig(normalTaskId, normalPattern, false, "202307100000", "202307220000",
CycleUnitType.MINUTE, state));
configs.add(
getTestDataConfig(retryTaskId, retryPattern, true, "20230710", "20230722", CycleUnitType.DAY, state));
return TaskResult.builder().dataConfigs(configs).build();
}

private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry, String startTime, String endTime,
String cycleUnit, int state) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("devcloud_group_id");
Expand All @@ -260,8 +247,8 @@ private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry,
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit(cycleUnit);
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
fileTaskConfig.setDataTimeFrom(startTime);
fileTaskConfig.setDataTimeTo(endTime);
fileTaskConfig.setDataContentStyle("CSV");
fileTaskConfig.setDataSeparator("|");
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.plugin.task.file.FileDataUtils;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;

import org.slf4j.Logger;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.utils.file;
package org.apache.inlong.agent.plugin.task.file;

import java.io.IOException;
import java.nio.file.Files;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@

package org.apache.inlong.agent.plugin.task.file;

import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
import org.apache.inlong.agent.plugin.utils.file.Files;
import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
import org.apache.inlong.agent.plugin.utils.regex.Scanner;
import org.apache.inlong.agent.plugin.utils.regex.Scanner.FinalPatternInfo;
import org.apache.inlong.agent.utils.DateTransUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM;

Expand All @@ -52,57 +48,24 @@ public BasicFileInfo(String fileName, String dataTime) {
this.fileName = fileName;
this.dataTime = dataTime;
}

}

private static final Logger logger = LoggerFactory.getLogger(FileScanner.class);

public static List<String> getDataTimeList(long startTime, long endTime, String cycleUnit, String timeOffset,
boolean isRetry) {
if (!isRetry) {
startTime += DateTransUtils.calcOffset(timeOffset);
endTime += DateTransUtils.calcOffset(timeOffset);
}
List<String> dataTimeList = new ArrayList<>();
List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime, cycleUnit);
for (Long time : dateRegion) {
String dataTime = DateTransUtils.millSecConvertToTimeStr(time, cycleUnit);
dataTimeList.add(dataTime);
}
return dataTimeList;
}

public static List<BasicFileInfo> scanTaskBetweenTimes(String originPattern, String cycleUnit, String timeOffset,
long startTime, long endTime, boolean isRetry) {
if (!isRetry) {
startTime += DateTransUtils.calcOffset(timeOffset);
endTime += DateTransUtils.calcOffset(timeOffset);
}
String strStartTime = DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit);
String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime, cycleUnit);
logger.info("{} scan time is between {} and {}",
new Object[]{originPattern, strStartTime, strEndTime});

return scanTaskBetweenTimes(cycleUnit, originPattern, startTime, endTime);
}

/* Scan log files and create tasks between two times. */
public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, String originPattern, long startTime,
long endTime) {
List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime, cycleUnit);
List<BasicFileInfo> infos = new ArrayList<BasicFileInfo>();
for (Long time : dateRegion) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String fileName = NewDateUtils.replaceDateExpression(calendar, originPattern);
ArrayList<String> allPaths = FilePathUtil.cutDirectoryByWildcard(fileName);
List<BasicFileInfo> infos = new ArrayList<>();
List<FinalPatternInfo> finalPatternInfos = Scanner.getFinalPatternInfos(originPattern, cycleUnit, timeOffset,
startTime, endTime, isRetry);
for (FinalPatternInfo finalPatternInfo : finalPatternInfos) {
ArrayList<String> allPaths = PatternUtil.cutDirectoryByWildcard(finalPatternInfo.finalPattern);
String firstDir = allPaths.get(0);
String secondDir = allPaths.get(0) + File.separator + allPaths.get(1);
ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, secondDir, fileName, 3,
ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, secondDir, finalPatternInfo.finalPattern, 3,
DEFAULT_FILE_MAX_NUM);
for (String file : fileList) {
// TODO the time is not YYYYMMDDHH
String dataTime = DateTransUtils.millSecConvertToTimeStr(time, cycleUnit);
String dataTime = DateTransUtils.millSecConvertToTimeStr(finalPatternInfo.dataTime, cycleUnit);
BasicFileInfo info = new BasicFileInfo(file, dataTime);
logger.info("scan new task fileName {} ,dataTime {}", file, dataTime);
infos.add(info);
Expand Down Expand Up @@ -134,34 +97,4 @@ private static ArrayList<String> getUpdatedOrNewFiles(String firstDir, String se
}
return ret;
}

@SuppressWarnings("unused")
private static ArrayList<String> getUpdatedOrNewFiles(String logFileName,
int maxFileNum) {
ArrayList<String> ret = new ArrayList<String>();
ArrayList<String> directories = FilePathUtil
.cutDirectoryByWildcardAndDateExpression(logFileName);
String parentDir = directories.get(0) + File.separator
+ directories.get(1);

Pattern pattern = Pattern.compile(directories.get(2),
Pattern.CASE_INSENSITIVE);
for (File file : new File(parentDir).listFiles()) {
Matcher matcher = pattern.matcher(file.getName());
if (matcher.matches() && ret.size() < maxFileNum) {
ret.add(file.getAbsolutePath());
}
}
return ret;
}

public static void main(String[] args) {

ArrayList<String> fileList = FileScanner.getUpdatedOrNewFiles(
"f:\\\\abc", "f:\\\\abc\\\\", "f:\\\\abc\\\\1.txt", 3, 100);
// fileList = FileScanner.getUpdatedOrNewFiles("F:\\abc\\1.txt", 100);
for (String fileName : fileList) {
System.out.println(fileName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.utils.file;
package org.apache.inlong.agent.plugin.task.file;

import java.io.File;
import java.util.Comparator;
Expand Down
Loading

0 comments on commit 81fb821

Please sign in to comment.