Skip to content

Commit

Permalink
[persistence] optimize task count operations and metrics (#1719)
Browse files Browse the repository at this point in the history
* [persistence] optimize task count operations and metrics

* optimize latency metrics and delete

* optimize purge operation
  • Loading branch information
cyrilou242 authored Dec 13, 2024
1 parent 0526c01 commit 8c23b7b
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ai.startree.thirdeye.spi.datalayer.DaoFilter;
import ai.startree.thirdeye.spi.datalayer.Predicate;
import ai.startree.thirdeye.spi.datalayer.bao.TaskManager;
import ai.startree.thirdeye.spi.datalayer.dto.AbstractDTO;
import ai.startree.thirdeye.spi.datalayer.dto.AuthorizationConfigurationDTO;
import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO;
import ai.startree.thirdeye.spi.task.TaskInfo;
Expand All @@ -35,6 +36,7 @@
import com.google.inject.persist.Transactional;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import java.sql.Connection;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -205,12 +207,15 @@ public void purge(@Nullable final Duration expiryDurationOptional,

final long startTime = System.nanoTime();
final List<TaskDTO> tasksToBeDeleted = filter(new DaoFilter()
// non locking read
.setTransactionIsolationLevel(Connection.TRANSACTION_READ_UNCOMMITTED)
.setPredicate(Predicate.LT("createTime", formattedDate))
.setLimit((long) limit)
);

/* Delete each task */
tasksToBeDeleted.forEach(this::delete);
// locking but using primary key, should only lock the impacted rows
deleteByIds(tasksToBeDeleted.stream().map(AbstractDTO::getId).toList());

final double totalTime = (System.nanoTime() - startTime) / 1e9;

Expand Down Expand Up @@ -239,11 +244,7 @@ public void cleanupOrphanTasks(final Timestamp activeThreshold) {
);
}

public long countByStatus(final TaskStatus status) {
return count(Predicate.EQ("status", status.toString()));
}

public long countBy(final TaskStatus status, final TaskType type) {
private long countBy(final TaskStatus status, final TaskType type) {
return count(Predicate.AND(
Predicate.EQ("status", status.toString()),
Predicate.EQ("type", type)));
Expand Down Expand Up @@ -276,11 +277,13 @@ public void registerDatabaseMetrics() {
LOG.info("Registered task database metrics.");
}

// FIXME CYRIL - this should have as less cache as possible and as precise as possible
// TODO CYRIL scale - compute this in database directly - for the moment we assume the filter is such that the number of tasks returned is small
// FIXME CYRIL - this should have as less cache as possible
// TODO CYRIL scale - compute this in database directly - for the moment we assume the filter is such that the number of tasks returned is small
// TODO CYRIL - may be simpler to perform a single group by query now that there is TRANSACTION_READ_UNCOMMITTED
private long getTaskLatency(final TaskType type, TaskStatus... pendingStatuses) {
// fetch pending tasks from DB
final DaoFilter filter = new DaoFilter()
.setTransactionIsolationLevel(Connection.TRANSACTION_READ_UNCOMMITTED)
.setPredicate(Predicate.AND(
Predicate.IN("status", pendingStatuses),
Predicate.EQ("type", type)
Expand Down Expand Up @@ -376,7 +379,9 @@ public int deleteRecordsOlderThanDays(final int days) {

@Override
public List<TaskDTO> findAll() {
return dao.getAll();
// this operation can stress the task execution framework - if there is no use case for it, keep it unsupported
// most use cases should use a filter by namespace
throw new UnsupportedOperationException("findAll operation is not supported for Tasks. If this error is unexpected, please reach out to support.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ AND ref_id not in (select ref_id from task_entity where status = 'RUNNING')
FOR UPDATE SKIP LOCKED
""";
private static final Logger LOG = LoggerFactory.getLogger(TaskDao.class);
private static final boolean IS_DEBUG = LOG.isDebugEnabled();

private final DatabaseOrm databaseOrm;
private final DatabaseClient databaseClient;
Expand Down Expand Up @@ -179,19 +178,6 @@ public int update(final TaskDTO pojo, final Predicate predicate) {
}
}

public List<TaskDTO> getAll() {
try {
final List<TaskEntity> entities = databaseClient.executeTransaction(
(connection) -> databaseOrm.findAll(null,
null, null, TaskEntity.class, connection));
return toDto(entities);
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
// TODO CYRIL design - surface exception ?
return Collections.emptyList();
}
}

public List<TaskDTO> list(final long limit, final long offset) {
try {
final List<TaskEntity> entities = databaseClient.executeTransaction(
Expand Down Expand Up @@ -231,18 +217,27 @@ public List<TaskDTO> filter(final DaoFilter daoFilter) {
requireNonNull(daoFilter.getPredicate(),
"If the predicate is null, you can just do "
+ "getAll() which doesn't need to fetch IDs first");
return get(daoFilter.getPredicate(), daoFilter.getLimit());
return get(daoFilter.getPredicate(), daoFilter.getLimit(), daoFilter.getTransactionIsolationLevel());
}

public List<TaskDTO> get(final Predicate predicate) {
return get(predicate, null);
}

public List<TaskDTO> get(final Predicate predicate, @Nullable Long limit) {
public List<TaskDTO> get(final Predicate predicate, final @Nullable Long limit) {
return get(predicate, limit, null);
}

public List<TaskDTO> get(final Predicate predicate, final @Nullable Long limit, final @Nullable Integer transactionIsolationLevel) {
try {
final List<TaskEntity> entities = databaseClient.executeTransaction(
(connection) -> databaseOrm.findAll(
predicate, limit, null, TaskEntity.class, connection));
(connection) -> {
if (transactionIsolationLevel != null) {
connection.setTransactionIsolation(transactionIsolationLevel);
}
return databaseOrm.findAll(
predicate, limit, null, TaskEntity.class, connection);
});
return toDto(entities);
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
Expand All @@ -254,7 +249,10 @@ public List<TaskDTO> get(final Predicate predicate, @Nullable Long limit) {
public long count() {
try {
return databaseClient.executeTransaction(
(connection) -> databaseOrm.count(null, TaskEntity.class, connection));
(connection) -> {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
return databaseOrm.count(null, TaskEntity.class, connection);
});
} catch (Exception e) {
LOG.error(e.getMessage(), e);
// TODO CYRIL design - surface exception ?
Expand All @@ -265,7 +263,10 @@ public long count() {
public long count(final Predicate predicate) {
try {
return databaseClient.executeTransaction(
(connection) -> databaseOrm.count(predicate, TaskEntity.class, connection));
(connection) -> {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
return databaseOrm.count(predicate, TaskEntity.class, connection);
});
} catch (Exception e) {
LOG.error(e.getMessage(), e);
// TODO CYRIL design - surface exception ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package ai.startree.thirdeye.datalayer.bao;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import ai.startree.thirdeye.aspect.TimeProvider;
import ai.startree.thirdeye.datalayer.MySqlTestDatabase;
import ai.startree.thirdeye.spi.datalayer.DaoFilter;
import ai.startree.thirdeye.spi.datalayer.Predicate;
import ai.startree.thirdeye.spi.datalayer.bao.TaskManager;
import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO;
import ai.startree.thirdeye.spi.task.TaskInfo;
Expand Down Expand Up @@ -47,6 +50,9 @@
*/
public class TestAnomalyTaskManager {

private static final DaoFilter ALL_IN_NULL_NAMESPACE = new DaoFilter().setPredicate(
Predicate.EQ("namespace", null));

private static final Set<TaskStatus> allowedOldTaskStatus = new HashSet<>();

private static final TimeProvider CLOCK = TimeProvider.instance();
Expand All @@ -73,7 +79,7 @@ void beforeClass() {
@AfterClass(alwaysRun = true)
public void afterClass() {
CLOCK.useSystemTime();
taskDAO.findAll().forEach(taskDAO::delete);
taskDAO.filter(ALL_IN_NULL_NAMESPACE).forEach(taskDAO::delete);
}

@Test
Expand All @@ -83,14 +89,19 @@ public void testCreate() throws JsonProcessingException {
anomalyTaskId2 = taskDAO.save(getTestTaskSpec( 2));
Assert.assertNotNull(anomalyTaskId2);
}

@Test
public void testFilterAllNotSupported() {
assertThatThrownBy(() -> taskDAO.findAll()).isInstanceOf(UnsupportedOperationException.class);
}

@Test(dependsOnMethods = {"testCreate"})
public void testFindAll() {
List<TaskDTO> anomalyTasks = taskDAO.findAll();
public void testFilterAllInANamespace() {
List<TaskDTO> anomalyTasks = taskDAO.filter(ALL_IN_NULL_NAMESPACE);
Assert.assertEquals(anomalyTasks.size(), 2);
}

@Test(dependsOnMethods = {"testFindAll"})
@Test(dependsOnMethods = {"testFilterAllInANamespace"})
public void testAcquireTaskToRun() throws Exception {
CLOCK.tick(1);
final Long workerId = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import ai.startree.thirdeye.datalayer.MySqlTestDatabase;
import ai.startree.thirdeye.spi.datalayer.DaoFilter;
import ai.startree.thirdeye.spi.datalayer.Predicate;
import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO;
import ai.startree.thirdeye.spi.task.TaskStatus;
Expand All @@ -28,6 +29,8 @@

public class TestTaskDao {

public static final DaoFilter ALL_IN_NULL_NAMESPACE = new DaoFilter().setPredicate(
Predicate.EQ("namespace", null));
private TaskDao dao;

@BeforeClass
Expand All @@ -38,7 +41,7 @@ void beforeClass() {

@AfterClass(alwaysRun = true)
public void afterClass() {
dao.getAll().forEach(task -> dao.delete(task.getId()));
dao.filter(ALL_IN_NULL_NAMESPACE).forEach(task -> dao.delete(task.getId()));
}

private TaskDTO buildTask() {
Expand All @@ -62,15 +65,15 @@ public void saveTest() {

@Test(dependsOnMethods = {"saveTest"})
public void updateTest() {
TaskDTO dto = dao.getAll().get(0);
TaskDTO dto = dao.filter(ALL_IN_NULL_NAMESPACE).get(0);
dto.setVersion(dto.getVersion()+1);
assertThat(dao.update(dto)).isEqualTo(1);
assertThat(dao.get(dto.getId()).getVersion()).isEqualTo(dto.getVersion());
}

@Test(dependsOnMethods = {"updateTest"})
public void deleteTest() {
List<TaskDTO> tasks = dao.getAll();
List<TaskDTO> tasks = dao.filter(ALL_IN_NULL_NAMESPACE);
assertThat(tasks.size()).isGreaterThan(0);
TaskDTO taskToDelete = tasks.get(0);
dao.delete(taskToDelete.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ public class DaoFilter {
private Long offset;
private String orderByKey;
private boolean isDesc = false;

// for instance: Connection.TRANSACTION_READ_UNCOMMITTED - if not set, the transaction isolation should not be set
// NOTE: this parameter is not supported by all DAOs yet
private Integer transactionIsolationLevel = null;

public Predicate getPredicate() {
return predicate;
Expand Down Expand Up @@ -78,4 +82,13 @@ public DaoFilter setDesc(final boolean desc) {
isDesc = desc;
return this;
}

public Integer getTransactionIsolationLevel() {
return transactionIsolationLevel;
}

public DaoFilter setTransactionIsolationLevel(final Integer transactionIsolationLevel) {
this.transactionIsolationLevel = transactionIsolationLevel;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,4 @@ void updateStatusAndTaskEndTime(Long id, TaskStatus oldStatus, TaskStatus newSta
void purge(Duration expiryDuration, Integer limitOptional);

void cleanupOrphanTasks(Timestamp activeThreshold);

long countByStatus(final TaskStatus status);
}

0 comments on commit 8c23b7b

Please sign in to comment.