diff --git a/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/SchedulingTest.java b/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/SchedulingTest.java index d7d9600ce1..80287ec733 100644 --- a/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/SchedulingTest.java +++ b/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/SchedulingTest.java @@ -19,18 +19,25 @@ import static ai.startree.thirdeye.ThirdEyeTestClient.ALERT_LIST_TYPE; import static ai.startree.thirdeye.ThirdEyeTestClient.ANOMALIES_LIST_TYPE; import static ai.startree.thirdeye.ThirdEyeTestClient.DATASOURCE_LIST_TYPE; +import static ai.startree.thirdeye.ThirdEyeTestClient.TASK_LIST_TYPE; import static org.assertj.core.api.Assertions.assertThat; import ai.startree.thirdeye.aspect.TimeProvider; import ai.startree.thirdeye.spi.api.AlertApi; import ai.startree.thirdeye.spi.api.AnomalyApi; import ai.startree.thirdeye.spi.api.DataSourceApi; +import ai.startree.thirdeye.spi.api.TaskApi; +import ai.startree.thirdeye.spi.task.TaskStatus; +import ai.startree.thirdeye.spi.task.TaskSubType; +import ai.startree.thirdeye.spi.task.TaskType; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.MultivaluedHashMap; import jakarta.ws.rs.core.MultivaluedMap; import jakarta.ws.rs.core.Response; import java.io.IOException; +import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +78,7 @@ public class SchedulingTest { private static final long MARCH_26_2020_05H00 = 1585198800_000L; // = MARCH_26_2020_05H00 - delay P3D and floor granularity P1D (see config in alert json) private static final long MARCH_23_2020_00H00 = 1584921600_000L; + public static final Set TASK_PENDING_STATUSES = Set.of(TaskStatus.WAITING, TaskStatus.RUNNING); static { try { @@ -86,6 +94,7 @@ public class SchedulingTest { private ThirdEyeTestClient client; private long alertId; private DataSourceApi pinotDataSourceApi; + private Long onboardingTaskId; @BeforeClass public void beforeClass() throws Exception { @@ -144,11 +153,23 @@ public void testCreateAlertLastTimestamp() { } @Test(dependsOnMethods = "testCreateAlertLastTimestamp", timeOut = 60000L) + public void testTaskIsCreated() throws Exception { + final List tasks = getTasks(); + assertThat(tasks).hasSize(1); + final TaskApi task = tasks.getFirst(); + assertThat(task.getTaskType()).isEqualTo(TaskType.DETECTION); + assertThat(task.getTaskSubType()).isEqualTo(TaskSubType.DETECTION_HISTORICAL_DATA_AFTER_CREATE); + onboardingTaskId = task.getId(); + } + + @Test(dependsOnMethods = "testTaskIsCreated", timeOut = 60000L) public void testOnboardingLastTimestamp() throws Exception { - // wait for anomalies - proxy to know when the onboarding task has run - while (getAnomalies().isEmpty()) { + // wait for onboarding task to be completed + TaskApi onboardingTask = getTask(onboardingTaskId); + while (TASK_PENDING_STATUSES.contains(onboardingTask.getStatus())) { // see taskDriver server config for optimization Thread.sleep(1000); + onboardingTask = getTask(onboardingTaskId); } // check that lastTimestamp is the endTime of the Onboarding task: March 21 1H @@ -158,9 +179,6 @@ public void testOnboardingLastTimestamp() throws Exception { @Test(dependsOnMethods = "testOnboardingLastTimestamp", timeOut = 60000L) public void testAfterDetectionCronLastTimestamp() throws InterruptedException { - // get current number of anomalies - final int numAnomaliesBeforeDetectionRun = getAnomalies().size(); - // advance detection time to March 22, 2020, 00:00:00 UTC // this should trigger the cron - and a new anomaly is expected on [March 21 - March 22] CLOCK.useMockTime(MARCH_25_2020_05H00); @@ -169,10 +187,13 @@ public void testAfterDetectionCronLastTimestamp() throws InterruptedException { // give thread to detectionCronScheduler and to quartz scheduler - (quartz idle time is weaved to 100 ms for test speed) Thread.sleep(1000); - // wait for the new anomaly to be created - proxy to know when the detection has run - while (getAnomalies().size() == numAnomaliesBeforeDetectionRun) { + // wait for the new task to be created - proxy to know when the detection is triggered + List tasks = getTasks(); + while (tasks.size() == 1 || TASK_PENDING_STATUSES.contains(tasks.getLast().getStatus())) { Thread.sleep(1000); + tasks = getTasks(); } + assertThat(tasks.getLast().getTaskSubType()).isEqualTo(TaskSubType.DETECTION_TRIGGERED_BY_CRON); // check that lastTimestamp after detection is the runTime of the cron final long alertLastTimestamp = getAlertLastTimestamp(); @@ -221,6 +242,20 @@ private List getAnomalies() { return response.readEntity(ANOMALIES_LIST_TYPE); } + private List getTasks() { + final Response response = client.request("api/tasks").get(); + assert200(response); + final List taskApis = response.readEntity(TASK_LIST_TYPE); + taskApis.sort(Comparator.comparingLong(TaskApi::getId)); + return taskApis; + } + + private TaskApi getTask(final long id) { + final Response response = client.request("api/tasks/" + id).get(); + assert200(response); + return response.readEntity(TaskApi.class); + } + private long getAlertLastTimestamp() { final Response getResponse = client.request("api/alerts/" + alertId).get(); assertThat(getResponse.getStatus()).isEqualTo(200); diff --git a/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/bao/TaskManagerImpl.java b/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/bao/TaskManagerImpl.java index 1a243fb701..922f7fde6d 100644 --- a/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/bao/TaskManagerImpl.java +++ b/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/bao/TaskManagerImpl.java @@ -29,6 +29,7 @@ import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO; import ai.startree.thirdeye.spi.task.TaskInfo; import ai.startree.thirdeye.spi.task.TaskStatus; +import ai.startree.thirdeye.spi.task.TaskSubType; import ai.startree.thirdeye.spi.task.TaskType; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.inject.Inject; @@ -70,8 +71,8 @@ public TaskManagerImpl(final TaskDao dao) { } @Override - public TaskDTO createTaskDto(final TaskInfo taskInfo, final TaskType taskType, final - AuthorizationConfigurationDTO auth) { + public TaskDTO createTaskDto(final TaskInfo taskInfo, final TaskType taskType, + final TaskSubType taskSubType, final AuthorizationConfigurationDTO auth) { final String taskInfoJson; try { taskInfoJson = VANILLA_OBJECT_MAPPER.writeValueAsString(taskInfo); @@ -81,6 +82,7 @@ public TaskDTO createTaskDto(final TaskInfo taskInfo, final TaskType taskType, f final TaskDTO task = new TaskDTO() .setTaskType(taskType) + .setTaskSubType(taskSubType) .setJobName(taskType.toString() + "_" + taskInfo.getRefId()) .setStatus(TaskStatus.WAITING) .setTaskInfo(taskInfoJson) diff --git a/thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/job/DetectionPipelineJob.java b/thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/job/DetectionPipelineJob.java index b147cd8f0f..8b5fa91a23 100644 --- a/thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/job/DetectionPipelineJob.java +++ b/thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/job/DetectionPipelineJob.java @@ -16,6 +16,7 @@ import static ai.startree.thirdeye.scheduler.JobUtils.BACKPRESSURE_COUNTERS; import static ai.startree.thirdeye.scheduler.JobUtils.FAILED_TASK_CREATION_COUNTERS; import static ai.startree.thirdeye.scheduler.JobUtils.getIdFromJobKey; +import static ai.startree.thirdeye.spi.task.TaskSubType.DETECTION_TRIGGERED_BY_CRON; import static ai.startree.thirdeye.spi.task.TaskType.DETECTION; import static ai.startree.thirdeye.spi.util.AlertMetadataUtils.getDateTimeZone; import static ai.startree.thirdeye.spi.util.SpiUtils.optional; @@ -29,7 +30,6 @@ import ai.startree.thirdeye.spi.datalayer.dto.DatasetConfigDTO; import ai.startree.thirdeye.spi.datalayer.dto.DetectionPipelineTaskInfo; import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO; -import ai.startree.thirdeye.spi.task.TaskType; import ai.startree.thirdeye.spi.util.TimeUtils; import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; @@ -87,9 +87,9 @@ public void execute(JobExecutionContext ctx) { BACKPRESSURE_COUNTERS.get(DETECTION).increment(); return; } - final TaskDTO taskDTO = taskManager.createTaskDto(taskInfo, TaskType.DETECTION, - alert.getAuth()); - LOG.info("Created {} task {} with settings {}", TaskType.DETECTION, taskDTO.getId(), + final TaskDTO taskDTO = taskManager.createTaskDto(taskInfo, DETECTION, + DETECTION_TRIGGERED_BY_CRON, alert.getAuth()); + LOG.info("Created {} task {} with settings {}", DETECTION, taskDTO.getId(), taskDTO); } catch (Exception e) { LOG.error( diff --git a/thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/job/NotificationPipelineJob.java b/thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/job/NotificationPipelineJob.java index a93cc50cce..45c6df52be 100644 --- a/thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/job/NotificationPipelineJob.java +++ b/thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/job/NotificationPipelineJob.java @@ -16,6 +16,7 @@ import static ai.startree.thirdeye.scheduler.JobUtils.BACKPRESSURE_COUNTERS; import static ai.startree.thirdeye.scheduler.JobUtils.FAILED_TASK_CREATION_COUNTERS; import static ai.startree.thirdeye.scheduler.JobUtils.getIdFromJobKey; +import static ai.startree.thirdeye.spi.task.TaskSubType.NOTIFICATION_TRIGGERED_BY_CRON; import static ai.startree.thirdeye.spi.task.TaskType.NOTIFICATION; import ai.startree.thirdeye.spi.datalayer.bao.SubscriptionGroupManager; @@ -67,7 +68,7 @@ public void execute(final JobExecutionContext ctx) { BACKPRESSURE_COUNTERS.get(NOTIFICATION).increment(); return; } - final TaskDTO t = taskManager.createTaskDto(taskInfo, NOTIFICATION, subscriptionGroup.getAuth()); + final TaskDTO t = taskManager.createTaskDto(taskInfo, NOTIFICATION, NOTIFICATION_TRIGGERED_BY_CRON, subscriptionGroup.getAuth()); LOG.info("Created {} task {}. taskInfo: {}", NOTIFICATION, t.getId(), t); } catch (Exception e) { LOG.error("Exception running notification pipeline job {}. Notification task will not be scheduled.", ctx.getJobDetail().getKey().getName(), e); diff --git a/thirdeye-server/src/main/java/ai/startree/thirdeye/service/AlertService.java b/thirdeye-server/src/main/java/ai/startree/thirdeye/service/AlertService.java index 96fc1ac4ad..0c566f02a6 100644 --- a/thirdeye-server/src/main/java/ai/startree/thirdeye/service/AlertService.java +++ b/thirdeye-server/src/main/java/ai/startree/thirdeye/service/AlertService.java @@ -56,6 +56,7 @@ import ai.startree.thirdeye.spi.datalayer.dto.RcaInvestigationDTO; import ai.startree.thirdeye.spi.datalayer.dto.SubscriptionGroupDTO; import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO; +import ai.startree.thirdeye.spi.task.TaskSubType; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -174,7 +175,7 @@ protected void postCreate(final AlertDTO dto) { // run the detection task on the historical data // note: the alert will not be initialized if it has isActive to false // FIXME cyril - should we run a dummy task like in postupdate to refresh enumeration items quickly? - createDetectionTask(dto, dto.getLastTimestamp(), System.currentTimeMillis()); + createDetectionTask(dto, dto.getLastTimestamp(), System.currentTimeMillis(), TaskSubType.DETECTION_HISTORICAL_DATA_AFTER_CREATE); } } @@ -189,11 +190,11 @@ protected void postUpdate(final ThirdEyePrincipal principal, final AlertDTO dto) * In this case the start and end timestamp is the same to ensure that we update the enumeration * items but we don't actually run the detection task. */ - createDetectionTask(dto, dto.getLastTimestamp(), dto.getLastTimestamp()); + createDetectionTask(dto, dto.getLastTimestamp(), dto.getLastTimestamp(), TaskSubType.DETECTION_HISTORICAL_DATA_AFTER_UPDATE); // perform a soft-reset - rerun the detection on the whole historical data - existing and new anomalies will be merged // note: the 2 detection tasks can run concurrently, the order does not matter because the last timestamp after the run of the 2 tasks is the same // we could remove the first one but this would make the UI feel less snappy, because a new enumeration would not appear until the full historical replay is finished - createDetectionTask(dto, minimumLastTimestamp(principal, dto), dto.getLastTimestamp()); + createDetectionTask(dto, minimumLastTimestamp(principal, dto), dto.getLastTimestamp(), TaskSubType.DETECTION_HISTORICAL_DATA_AFTER_UPDATE); } } @@ -226,7 +227,7 @@ public void runTask( authorizationManager.ensureNamespace(principal, dto); authorizationManager.ensureCanEdit(principal, dto, dto); - createDetectionTask(dto, startTime, safeEndTime(endTime)); + createDetectionTask(dto, startTime, safeEndTime(endTime), TaskSubType.DETECTION_HISTORICAL_DATA_MANUAL); } private long safeEndTime(final @Nullable Long endTime) { @@ -415,14 +416,15 @@ private long minimumLastTimestamp(final ThirdEyePrincipal principal, final Alert } } - private void createDetectionTask(final AlertDTO alertDto, final long start, final long end) { + private void createDetectionTask(final AlertDTO alertDto, final long start, final long end, final + TaskSubType taskSubType) { checkArgument(alertDto.getId() != null && alertDto.getId() >= 0); checkArgument(start <= end); final DetectionPipelineTaskInfo info = new DetectionPipelineTaskInfo(alertDto.getId(), start, end); try { - final TaskDTO t = taskManager.createTaskDto(info, DETECTION, alertDto.getAuth()); + final TaskDTO t = taskManager.createTaskDto(info, DETECTION, taskSubType, alertDto.getAuth()); LOG.info("Created {} task {} with settings {}", DETECTION, t.getId(), t); } catch (final Exception e) { FAILED_TASK_CREATION_COUNTERS.get(DETECTION).increment(); diff --git a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/api/TaskApi.java b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/api/TaskApi.java index 84e1443eee..572327a74a 100644 --- a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/api/TaskApi.java +++ b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/api/TaskApi.java @@ -14,6 +14,7 @@ package ai.startree.thirdeye.spi.api; import ai.startree.thirdeye.spi.task.TaskStatus; +import ai.startree.thirdeye.spi.task.TaskSubType; import ai.startree.thirdeye.spi.task.TaskType; import java.sql.Timestamp; import java.util.Date; @@ -24,6 +25,7 @@ public class TaskApi implements ThirdEyeCrudApi { private Date created; private Date updated; private TaskType taskType; + private TaskSubType taskSubType; private Long workerId; private JobApi job; private TaskStatus status; @@ -167,4 +169,13 @@ public TaskApi setAuth( this.auth = auth; return this; } + + public TaskSubType getTaskSubType() { + return taskSubType; + } + + public TaskApi setTaskSubType(final TaskSubType taskSubType) { + this.taskSubType = taskSubType; + return this; + } } diff --git a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/bao/TaskManager.java b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/bao/TaskManager.java index 9ad5ef1a48..733414fa16 100644 --- a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/bao/TaskManager.java +++ b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/bao/TaskManager.java @@ -17,6 +17,7 @@ import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO; import ai.startree.thirdeye.spi.task.TaskInfo; import ai.startree.thirdeye.spi.task.TaskStatus; +import ai.startree.thirdeye.spi.task.TaskSubType; import ai.startree.thirdeye.spi.task.TaskType; import java.sql.Timestamp; import java.time.Duration; @@ -31,7 +32,7 @@ public interface TaskManager extends AbstractManager { TaskDTO createTaskDto(final TaskInfo taskInfo, final TaskType taskType, - final AuthorizationConfigurationDTO auth) throws Exception; + final TaskSubType taskSubType, final AuthorizationConfigurationDTO auth) throws Exception; @Deprecated // use acquireNextTaskToRun instead TaskDTO findNextTaskToRun(); diff --git a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/dto/TaskDTO.java b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/dto/TaskDTO.java index e00083118b..6d8609dcab 100644 --- a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/dto/TaskDTO.java +++ b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/dto/TaskDTO.java @@ -14,6 +14,7 @@ package ai.startree.thirdeye.spi.datalayer.dto; import ai.startree.thirdeye.spi.task.TaskStatus; +import ai.startree.thirdeye.spi.task.TaskSubType; import ai.startree.thirdeye.spi.task.TaskType; import java.sql.Timestamp; import java.util.Objects; @@ -26,6 +27,7 @@ public class TaskDTO extends AbstractDTO { private TaskType taskType; + private TaskSubType taskSubType; private Long workerId; private Long jobId; private String jobName; @@ -164,4 +166,13 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(getId(), status, startTime, endTime, taskInfo); } + + public TaskSubType getTaskSubType() { + return taskSubType; + } + + public TaskDTO setTaskSubType(final TaskSubType taskSubType) { + this.taskSubType = taskSubType; + return this; + } } diff --git a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/task/TaskSubType.java b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/task/TaskSubType.java new file mode 100644 index 0000000000..bc0d2c071a --- /dev/null +++ b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/task/TaskSubType.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024 StarTree Inc + * + * Licensed under the StarTree Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at http://www.startree.ai/legal/startree-community-license + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OF ANY KIND, + * either express or implied. + * See the License for the specific language governing permissions and limitations under + * the License. + */ +package ai.startree.thirdeye.spi.task; + +/** + * Provides finer grained information about the task type. + * Note: prefix the TaskSubType with the task type. + * TaskTypes should not share TaskSubTypes. + * For instance, use NOTIFICATION_MANUAL and DETECTION_MANUAL, not MANUAL. + */ +public enum TaskSubType { + DETECTION_HISTORICAL_DATA_AFTER_CREATE, + DETECTION_HISTORICAL_DATA_AFTER_UPDATE, + DETECTION_HISTORICAL_DATA_MANUAL, + DETECTION_TRIGGERED_BY_CRON, + NOTIFICATION_TRIGGERED_BY_CRON +}