Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spi] add TaskSubType #1759

Merged
merged 4 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,25 @@
import static ai.startree.thirdeye.ThirdEyeTestClient.ALERT_LIST_TYPE;
import static ai.startree.thirdeye.ThirdEyeTestClient.ANOMALIES_LIST_TYPE;
import static ai.startree.thirdeye.ThirdEyeTestClient.DATASOURCE_LIST_TYPE;
import static ai.startree.thirdeye.ThirdEyeTestClient.TASK_LIST_TYPE;
import static org.assertj.core.api.Assertions.assertThat;

import ai.startree.thirdeye.aspect.TimeProvider;
import ai.startree.thirdeye.spi.api.AlertApi;
import ai.startree.thirdeye.spi.api.AnomalyApi;
import ai.startree.thirdeye.spi.api.DataSourceApi;
import ai.startree.thirdeye.spi.api.TaskApi;
import ai.startree.thirdeye.spi.task.TaskStatus;
import ai.startree.thirdeye.spi.task.TaskSubType;
import ai.startree.thirdeye.spi.task.TaskType;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.MultivaluedHashMap;
import jakarta.ws.rs.core.MultivaluedMap;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -71,6 +78,7 @@ public class SchedulingTest {
private static final long MARCH_26_2020_05H00 = 1585198800_000L;
// = MARCH_26_2020_05H00 - delay P3D and floor granularity P1D (see config in alert json)
private static final long MARCH_23_2020_00H00 = 1584921600_000L;
public static final Set<TaskStatus> TASK_PENDING_STATUSES = Set.of(TaskStatus.WAITING, TaskStatus.RUNNING);

static {
try {
Expand All @@ -86,6 +94,7 @@ public class SchedulingTest {
private ThirdEyeTestClient client;
private long alertId;
private DataSourceApi pinotDataSourceApi;
private Long onboardingTaskId;

@BeforeClass
public void beforeClass() throws Exception {
Expand Down Expand Up @@ -144,11 +153,23 @@ public void testCreateAlertLastTimestamp() {
}

@Test(dependsOnMethods = "testCreateAlertLastTimestamp", timeOut = 60000L)
public void testTaskIsCreated() throws Exception {
final List<TaskApi> tasks = getTasks();
assertThat(tasks).hasSize(1);
final TaskApi task = tasks.getFirst();
assertThat(task.getTaskType()).isEqualTo(TaskType.DETECTION);
assertThat(task.getTaskSubType()).isEqualTo(TaskSubType.DETECTION_HISTORICAL_DATA_AFTER_CREATE);
onboardingTaskId = task.getId();
}

@Test(dependsOnMethods = "testTaskIsCreated", timeOut = 60000L)
public void testOnboardingLastTimestamp() throws Exception {
// wait for anomalies - proxy to know when the onboarding task has run
while (getAnomalies().isEmpty()) {
// wait for onboarding task to be completed
TaskApi onboardingTask = getTask(onboardingTaskId);
while (TASK_PENDING_STATUSES.contains(onboardingTask.getStatus())) {
// see taskDriver server config for optimization
Thread.sleep(1000);
onboardingTask = getTask(onboardingTaskId);
}

// check that lastTimestamp is the endTime of the Onboarding task: March 21 1H
Expand All @@ -158,9 +179,6 @@ public void testOnboardingLastTimestamp() throws Exception {

@Test(dependsOnMethods = "testOnboardingLastTimestamp", timeOut = 60000L)
public void testAfterDetectionCronLastTimestamp() throws InterruptedException {
// get current number of anomalies
final int numAnomaliesBeforeDetectionRun = getAnomalies().size();

// advance detection time to March 22, 2020, 00:00:00 UTC
// this should trigger the cron - and a new anomaly is expected on [March 21 - March 22]
CLOCK.useMockTime(MARCH_25_2020_05H00);
Expand All @@ -169,10 +187,13 @@ public void testAfterDetectionCronLastTimestamp() throws InterruptedException {
// give thread to detectionCronScheduler and to quartz scheduler - (quartz idle time is weaved to 100 ms for test speed)
Thread.sleep(1000);

// wait for the new anomaly to be created - proxy to know when the detection has run
while (getAnomalies().size() == numAnomaliesBeforeDetectionRun) {
// wait for the new task to be created - proxy to know when the detection is triggered
List<TaskApi> tasks = getTasks();
while (tasks.size() == 1 || TASK_PENDING_STATUSES.contains(tasks.getLast().getStatus())) {
Thread.sleep(1000);
tasks = getTasks();
}
assertThat(tasks.getLast().getTaskSubType()).isEqualTo(TaskSubType.DETECTION_TRIGGERED_BY_CRON);

// check that lastTimestamp after detection is the runTime of the cron
final long alertLastTimestamp = getAlertLastTimestamp();
Expand Down Expand Up @@ -221,6 +242,20 @@ private List<AnomalyApi> getAnomalies() {
return response.readEntity(ANOMALIES_LIST_TYPE);
}

private List<TaskApi> getTasks() {
final Response response = client.request("api/tasks").get();
assert200(response);
final List<TaskApi> taskApis = response.readEntity(TASK_LIST_TYPE);
taskApis.sort(Comparator.comparingLong(TaskApi::getId));
return taskApis;
}

private TaskApi getTask(final long id) {
final Response response = client.request("api/tasks/" + id).get();
assert200(response);
return response.readEntity(TaskApi.class);
}

private long getAlertLastTimestamp() {
final Response getResponse = client.request("api/alerts/" + alertId).get();
assertThat(getResponse.getStatus()).isEqualTo(200);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO;
import ai.startree.thirdeye.spi.task.TaskInfo;
import ai.startree.thirdeye.spi.task.TaskStatus;
import ai.startree.thirdeye.spi.task.TaskSubType;
import ai.startree.thirdeye.spi.task.TaskType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.inject.Inject;
Expand Down Expand Up @@ -70,8 +71,8 @@ public TaskManagerImpl(final TaskDao dao) {
}

@Override
public TaskDTO createTaskDto(final TaskInfo taskInfo, final TaskType taskType, final
AuthorizationConfigurationDTO auth) {
public TaskDTO createTaskDto(final TaskInfo taskInfo, final TaskType taskType,
final TaskSubType taskSubType, final AuthorizationConfigurationDTO auth) {
final String taskInfoJson;
try {
taskInfoJson = VANILLA_OBJECT_MAPPER.writeValueAsString(taskInfo);
Expand All @@ -81,6 +82,7 @@ public TaskDTO createTaskDto(final TaskInfo taskInfo, final TaskType taskType, f

final TaskDTO task = new TaskDTO()
.setTaskType(taskType)
.setTaskSubType(taskSubType)
.setJobName(taskType.toString() + "_" + taskInfo.getRefId())
.setStatus(TaskStatus.WAITING)
.setTaskInfo(taskInfoJson)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static ai.startree.thirdeye.scheduler.JobUtils.BACKPRESSURE_COUNTERS;
import static ai.startree.thirdeye.scheduler.JobUtils.FAILED_TASK_CREATION_COUNTERS;
import static ai.startree.thirdeye.scheduler.JobUtils.getIdFromJobKey;
import static ai.startree.thirdeye.spi.task.TaskSubType.DETECTION_TRIGGERED_BY_CRON;
import static ai.startree.thirdeye.spi.task.TaskType.DETECTION;
import static ai.startree.thirdeye.spi.util.AlertMetadataUtils.getDateTimeZone;
import static ai.startree.thirdeye.spi.util.SpiUtils.optional;
Expand All @@ -29,7 +30,6 @@
import ai.startree.thirdeye.spi.datalayer.dto.DatasetConfigDTO;
import ai.startree.thirdeye.spi.datalayer.dto.DetectionPipelineTaskInfo;
import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO;
import ai.startree.thirdeye.spi.task.TaskType;
import ai.startree.thirdeye.spi.util.TimeUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
Expand Down Expand Up @@ -87,9 +87,9 @@ public void execute(JobExecutionContext ctx) {
BACKPRESSURE_COUNTERS.get(DETECTION).increment();
return;
}
final TaskDTO taskDTO = taskManager.createTaskDto(taskInfo, TaskType.DETECTION,
alert.getAuth());
LOG.info("Created {} task {} with settings {}", TaskType.DETECTION, taskDTO.getId(),
final TaskDTO taskDTO = taskManager.createTaskDto(taskInfo, DETECTION,
DETECTION_TRIGGERED_BY_CRON, alert.getAuth());
LOG.info("Created {} task {} with settings {}", DETECTION, taskDTO.getId(),
taskDTO);
} catch (Exception e) {
LOG.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static ai.startree.thirdeye.scheduler.JobUtils.BACKPRESSURE_COUNTERS;
import static ai.startree.thirdeye.scheduler.JobUtils.FAILED_TASK_CREATION_COUNTERS;
import static ai.startree.thirdeye.scheduler.JobUtils.getIdFromJobKey;
import static ai.startree.thirdeye.spi.task.TaskSubType.NOTIFICATION_TRIGGERED_BY_CRON;
import static ai.startree.thirdeye.spi.task.TaskType.NOTIFICATION;

import ai.startree.thirdeye.spi.datalayer.bao.SubscriptionGroupManager;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void execute(final JobExecutionContext ctx) {
BACKPRESSURE_COUNTERS.get(NOTIFICATION).increment();
return;
}
final TaskDTO t = taskManager.createTaskDto(taskInfo, NOTIFICATION, subscriptionGroup.getAuth());
final TaskDTO t = taskManager.createTaskDto(taskInfo, NOTIFICATION, NOTIFICATION_TRIGGERED_BY_CRON, subscriptionGroup.getAuth());
LOG.info("Created {} task {}. taskInfo: {}", NOTIFICATION, t.getId(), t);
} catch (Exception e) {
LOG.error("Exception running notification pipeline job {}. Notification task will not be scheduled.", ctx.getJobDetail().getKey().getName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import ai.startree.thirdeye.spi.datalayer.dto.RcaInvestigationDTO;
import ai.startree.thirdeye.spi.datalayer.dto.SubscriptionGroupDTO;
import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO;
import ai.startree.thirdeye.spi.task.TaskSubType;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Singleton;
Expand Down Expand Up @@ -174,7 +175,7 @@ protected void postCreate(final AlertDTO dto) {
// run the detection task on the historical data
// note: the alert will not be initialized if it has isActive to false
// FIXME cyril - should we run a dummy task like in postupdate to refresh enumeration items quickly?
createDetectionTask(dto, dto.getLastTimestamp(), System.currentTimeMillis());
createDetectionTask(dto, dto.getLastTimestamp(), System.currentTimeMillis(), TaskSubType.DETECTION_HISTORICAL_DATA_AFTER_CREATE);
}
}

Expand All @@ -189,11 +190,11 @@ protected void postUpdate(final ThirdEyePrincipal principal, final AlertDTO dto)
* In this case the start and end timestamp is the same to ensure that we update the enumeration
* items but we don't actually run the detection task.
*/
createDetectionTask(dto, dto.getLastTimestamp(), dto.getLastTimestamp());
createDetectionTask(dto, dto.getLastTimestamp(), dto.getLastTimestamp(), TaskSubType.DETECTION_HISTORICAL_DATA_AFTER_UPDATE);
// perform a soft-reset - rerun the detection on the whole historical data - existing and new anomalies will be merged
// note: the 2 detection tasks can run concurrently, the order does not matter because the last timestamp after the run of the 2 tasks is the same
// we could remove the first one but this would make the UI feel less snappy, because a new enumeration would not appear until the full historical replay is finished
createDetectionTask(dto, minimumLastTimestamp(principal, dto), dto.getLastTimestamp());
createDetectionTask(dto, minimumLastTimestamp(principal, dto), dto.getLastTimestamp(), TaskSubType.DETECTION_HISTORICAL_DATA_AFTER_UPDATE);
}
}

Expand Down Expand Up @@ -226,7 +227,7 @@ public void runTask(
authorizationManager.ensureNamespace(principal, dto);
authorizationManager.ensureCanEdit(principal, dto, dto);

createDetectionTask(dto, startTime, safeEndTime(endTime));
createDetectionTask(dto, startTime, safeEndTime(endTime), TaskSubType.DETECTION_HISTORICAL_DATA_MANUAL);
}

private long safeEndTime(final @Nullable Long endTime) {
Expand Down Expand Up @@ -415,14 +416,15 @@ private long minimumLastTimestamp(final ThirdEyePrincipal principal, final Alert
}
}

private void createDetectionTask(final AlertDTO alertDto, final long start, final long end) {
private void createDetectionTask(final AlertDTO alertDto, final long start, final long end, final
TaskSubType taskSubType) {
checkArgument(alertDto.getId() != null && alertDto.getId() >= 0);
checkArgument(start <= end);
final DetectionPipelineTaskInfo info = new DetectionPipelineTaskInfo(alertDto.getId(), start,
end);

try {
final TaskDTO t = taskManager.createTaskDto(info, DETECTION, alertDto.getAuth());
final TaskDTO t = taskManager.createTaskDto(info, DETECTION, taskSubType, alertDto.getAuth());
LOG.info("Created {} task {} with settings {}", DETECTION, t.getId(), t);
} catch (final Exception e) {
FAILED_TASK_CREATION_COUNTERS.get(DETECTION).increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package ai.startree.thirdeye.spi.api;

import ai.startree.thirdeye.spi.task.TaskStatus;
import ai.startree.thirdeye.spi.task.TaskSubType;
import ai.startree.thirdeye.spi.task.TaskType;
import java.sql.Timestamp;
import java.util.Date;
Expand All @@ -24,6 +25,7 @@ public class TaskApi implements ThirdEyeCrudApi<TaskApi> {
private Date created;
private Date updated;
private TaskType taskType;
private TaskSubType taskSubType;
private Long workerId;
private JobApi job;
private TaskStatus status;
Expand Down Expand Up @@ -167,4 +169,13 @@ public TaskApi setAuth(
this.auth = auth;
return this;
}

public TaskSubType getTaskSubType() {
return taskSubType;
}

public TaskApi setTaskSubType(final TaskSubType taskSubType) {
this.taskSubType = taskSubType;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO;
import ai.startree.thirdeye.spi.task.TaskInfo;
import ai.startree.thirdeye.spi.task.TaskStatus;
import ai.startree.thirdeye.spi.task.TaskSubType;
import ai.startree.thirdeye.spi.task.TaskType;
import java.sql.Timestamp;
import java.time.Duration;
Expand All @@ -31,7 +32,7 @@
public interface TaskManager extends AbstractManager<TaskDTO> {

TaskDTO createTaskDto(final TaskInfo taskInfo, final TaskType taskType,
final AuthorizationConfigurationDTO auth) throws Exception;
final TaskSubType taskSubType, final AuthorizationConfigurationDTO auth) throws Exception;

@Deprecated // use acquireNextTaskToRun instead
TaskDTO findNextTaskToRun();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package ai.startree.thirdeye.spi.datalayer.dto;

import ai.startree.thirdeye.spi.task.TaskStatus;
import ai.startree.thirdeye.spi.task.TaskSubType;
import ai.startree.thirdeye.spi.task.TaskType;
import java.sql.Timestamp;
import java.util.Objects;
Expand All @@ -26,6 +27,7 @@
public class TaskDTO extends AbstractDTO {

private TaskType taskType;
private TaskSubType taskSubType;
private Long workerId;
private Long jobId;
private String jobName;
Expand Down Expand Up @@ -164,4 +166,13 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(getId(), status, startTime, endTime, taskInfo);
}

public TaskSubType getTaskSubType() {
return taskSubType;
}

public TaskDTO setTaskSubType(final TaskSubType taskSubType) {
this.taskSubType = taskSubType;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2024 StarTree Inc
*
* Licensed under the StarTree Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at http://www.startree.ai/legal/startree-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OF ANY KIND,
* either express or implied.
* See the License for the specific language governing permissions and limitations under
* the License.
*/
package ai.startree.thirdeye.spi.task;

/**
* Provides finer grained information about the task type.
* Note: prefix the TaskSubType with the task type.
* TaskTypes should not share TaskSubTypes.
* For instance, use NOTIFICATION_MANUAL and DETECTION_MANUAL, not MANUAL.
*/
public enum TaskSubType {
DETECTION_HISTORICAL_DATA_AFTER_CREATE,
DETECTION_HISTORICAL_DATA_AFTER_UPDATE,
DETECTION_HISTORICAL_DATA_MANUAL,
DETECTION_TRIGGERED_BY_CRON,
NOTIFICATION_TRIGGERED_BY_CRON
}
Loading