diff --git a/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/bao/TaskManagerImpl.java b/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/bao/TaskManagerImpl.java index e1c46d24dd..f54107bf60 100644 --- a/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/bao/TaskManagerImpl.java +++ b/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/bao/TaskManagerImpl.java @@ -24,6 +24,7 @@ import ai.startree.thirdeye.spi.datalayer.DaoFilter; import ai.startree.thirdeye.spi.datalayer.Predicate; import ai.startree.thirdeye.spi.datalayer.bao.TaskManager; +import ai.startree.thirdeye.spi.datalayer.dto.AbstractDTO; import ai.startree.thirdeye.spi.datalayer.dto.AuthorizationConfigurationDTO; import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO; import ai.startree.thirdeye.spi.task.TaskInfo; @@ -35,6 +36,7 @@ import com.google.inject.persist.Transactional; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; +import java.sql.Connection; import java.sql.Timestamp; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -205,12 +207,15 @@ public void purge(@Nullable final Duration expiryDurationOptional, final long startTime = System.nanoTime(); final List tasksToBeDeleted = filter(new DaoFilter() + // non locking read + .setTransactionIsolationLevel(Connection.TRANSACTION_READ_UNCOMMITTED) .setPredicate(Predicate.LT("createTime", formattedDate)) .setLimit((long) limit) ); /* Delete each task */ - tasksToBeDeleted.forEach(this::delete); + // locking but using primary key, should only lock the impacted rows + deleteByIds(tasksToBeDeleted.stream().map(AbstractDTO::getId).toList()); final double totalTime = (System.nanoTime() - startTime) / 1e9; @@ -239,11 +244,7 @@ public void cleanupOrphanTasks(final Timestamp activeThreshold) { ); } - public long countByStatus(final TaskStatus status) { - return count(Predicate.EQ("status", status.toString())); - } - - public long countBy(final TaskStatus status, final TaskType type) { + private long countBy(final TaskStatus status, final TaskType type) { return count(Predicate.AND( Predicate.EQ("status", status.toString()), Predicate.EQ("type", type))); @@ -276,11 +277,13 @@ public void registerDatabaseMetrics() { LOG.info("Registered task database metrics."); } - // FIXME CYRIL - this should have as less cache as possible and as precise as possible - // TODO CYRIL scale - compute this in database directly - for the moment we assume the filter is such that the number of tasks returned is small + // FIXME CYRIL - this should have as less cache as possible + // TODO CYRIL scale - compute this in database directly - for the moment we assume the filter is such that the number of tasks returned is small + // TODO CYRIL - may be simpler to perform a single group by query now that there is TRANSACTION_READ_UNCOMMITTED private long getTaskLatency(final TaskType type, TaskStatus... pendingStatuses) { // fetch pending tasks from DB final DaoFilter filter = new DaoFilter() + .setTransactionIsolationLevel(Connection.TRANSACTION_READ_UNCOMMITTED) .setPredicate(Predicate.AND( Predicate.IN("status", pendingStatuses), Predicate.EQ("type", type) @@ -376,7 +379,9 @@ public int deleteRecordsOlderThanDays(final int days) { @Override public List findAll() { - return dao.getAll(); + // this operation can stress the task execution framework - if there is no use case for it, keep it unsupported + // most use cases should use a filter by namespace + throw new UnsupportedOperationException("findAll operation is not supported for Tasks. If this error is unexpected, please reach out to support."); } @Override diff --git a/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/dao/TaskDao.java b/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/dao/TaskDao.java index 0f23b55a05..3bbb26885c 100644 --- a/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/dao/TaskDao.java +++ b/thirdeye-persistence/src/main/java/ai/startree/thirdeye/datalayer/dao/TaskDao.java @@ -65,7 +65,6 @@ AND ref_id not in (select ref_id from task_entity where status = 'RUNNING') FOR UPDATE SKIP LOCKED """; private static final Logger LOG = LoggerFactory.getLogger(TaskDao.class); - private static final boolean IS_DEBUG = LOG.isDebugEnabled(); private final DatabaseOrm databaseOrm; private final DatabaseClient databaseClient; @@ -179,19 +178,6 @@ public int update(final TaskDTO pojo, final Predicate predicate) { } } - public List getAll() { - try { - final List entities = databaseClient.executeTransaction( - (connection) -> databaseOrm.findAll(null, - null, null, TaskEntity.class, connection)); - return toDto(entities); - } catch (final Exception e) { - LOG.error(e.getMessage(), e); - // TODO CYRIL design - surface exception ? - return Collections.emptyList(); - } - } - public List list(final long limit, final long offset) { try { final List entities = databaseClient.executeTransaction( @@ -231,18 +217,27 @@ public List filter(final DaoFilter daoFilter) { requireNonNull(daoFilter.getPredicate(), "If the predicate is null, you can just do " + "getAll() which doesn't need to fetch IDs first"); - return get(daoFilter.getPredicate(), daoFilter.getLimit()); + return get(daoFilter.getPredicate(), daoFilter.getLimit(), daoFilter.getTransactionIsolationLevel()); } public List get(final Predicate predicate) { return get(predicate, null); } - public List get(final Predicate predicate, @Nullable Long limit) { + public List get(final Predicate predicate, final @Nullable Long limit) { + return get(predicate, limit, null); + } + + public List get(final Predicate predicate, final @Nullable Long limit, final @Nullable Integer transactionIsolationLevel) { try { final List entities = databaseClient.executeTransaction( - (connection) -> databaseOrm.findAll( - predicate, limit, null, TaskEntity.class, connection)); + (connection) -> { + if (transactionIsolationLevel != null) { + connection.setTransactionIsolation(transactionIsolationLevel); + } + return databaseOrm.findAll( + predicate, limit, null, TaskEntity.class, connection); + }); return toDto(entities); } catch (final Exception e) { LOG.error(e.getMessage(), e); @@ -254,7 +249,10 @@ public List get(final Predicate predicate, @Nullable Long limit) { public long count() { try { return databaseClient.executeTransaction( - (connection) -> databaseOrm.count(null, TaskEntity.class, connection)); + (connection) -> { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED); + return databaseOrm.count(null, TaskEntity.class, connection); + }); } catch (Exception e) { LOG.error(e.getMessage(), e); // TODO CYRIL design - surface exception ? @@ -265,7 +263,10 @@ public long count() { public long count(final Predicate predicate) { try { return databaseClient.executeTransaction( - (connection) -> databaseOrm.count(predicate, TaskEntity.class, connection)); + (connection) -> { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED); + return databaseOrm.count(predicate, TaskEntity.class, connection); + }); } catch (Exception e) { LOG.error(e.getMessage(), e); // TODO CYRIL design - surface exception ? diff --git a/thirdeye-persistence/src/test/java/ai/startree/thirdeye/datalayer/bao/TestAnomalyTaskManager.java b/thirdeye-persistence/src/test/java/ai/startree/thirdeye/datalayer/bao/TestAnomalyTaskManager.java index 283f5a48fc..c8fd88d07a 100644 --- a/thirdeye-persistence/src/test/java/ai/startree/thirdeye/datalayer/bao/TestAnomalyTaskManager.java +++ b/thirdeye-persistence/src/test/java/ai/startree/thirdeye/datalayer/bao/TestAnomalyTaskManager.java @@ -14,9 +14,12 @@ package ai.startree.thirdeye.datalayer.bao; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import ai.startree.thirdeye.aspect.TimeProvider; import ai.startree.thirdeye.datalayer.MySqlTestDatabase; +import ai.startree.thirdeye.spi.datalayer.DaoFilter; +import ai.startree.thirdeye.spi.datalayer.Predicate; import ai.startree.thirdeye.spi.datalayer.bao.TaskManager; import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO; import ai.startree.thirdeye.spi.task.TaskInfo; @@ -47,6 +50,9 @@ */ public class TestAnomalyTaskManager { + private static final DaoFilter ALL_IN_NULL_NAMESPACE = new DaoFilter().setPredicate( + Predicate.EQ("namespace", null)); + private static final Set allowedOldTaskStatus = new HashSet<>(); private static final TimeProvider CLOCK = TimeProvider.instance(); @@ -73,7 +79,7 @@ void beforeClass() { @AfterClass(alwaysRun = true) public void afterClass() { CLOCK.useSystemTime(); - taskDAO.findAll().forEach(taskDAO::delete); + taskDAO.filter(ALL_IN_NULL_NAMESPACE).forEach(taskDAO::delete); } @Test @@ -83,14 +89,19 @@ public void testCreate() throws JsonProcessingException { anomalyTaskId2 = taskDAO.save(getTestTaskSpec( 2)); Assert.assertNotNull(anomalyTaskId2); } + + @Test + public void testFilterAllNotSupported() { + assertThatThrownBy(() -> taskDAO.findAll()).isInstanceOf(UnsupportedOperationException.class); + } @Test(dependsOnMethods = {"testCreate"}) - public void testFindAll() { - List anomalyTasks = taskDAO.findAll(); + public void testFilterAllInANamespace() { + List anomalyTasks = taskDAO.filter(ALL_IN_NULL_NAMESPACE); Assert.assertEquals(anomalyTasks.size(), 2); } - @Test(dependsOnMethods = {"testFindAll"}) + @Test(dependsOnMethods = {"testFilterAllInANamespace"}) public void testAcquireTaskToRun() throws Exception { CLOCK.tick(1); final Long workerId = 1L; diff --git a/thirdeye-persistence/src/test/java/ai/startree/thirdeye/datalayer/dao/TestTaskDao.java b/thirdeye-persistence/src/test/java/ai/startree/thirdeye/datalayer/dao/TestTaskDao.java index cef7f7e435..a7ffdf5d98 100644 --- a/thirdeye-persistence/src/test/java/ai/startree/thirdeye/datalayer/dao/TestTaskDao.java +++ b/thirdeye-persistence/src/test/java/ai/startree/thirdeye/datalayer/dao/TestTaskDao.java @@ -16,6 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; import ai.startree.thirdeye.datalayer.MySqlTestDatabase; +import ai.startree.thirdeye.spi.datalayer.DaoFilter; import ai.startree.thirdeye.spi.datalayer.Predicate; import ai.startree.thirdeye.spi.datalayer.dto.TaskDTO; import ai.startree.thirdeye.spi.task.TaskStatus; @@ -28,6 +29,8 @@ public class TestTaskDao { + public static final DaoFilter ALL_IN_NULL_NAMESPACE = new DaoFilter().setPredicate( + Predicate.EQ("namespace", null)); private TaskDao dao; @BeforeClass @@ -38,7 +41,7 @@ void beforeClass() { @AfterClass(alwaysRun = true) public void afterClass() { - dao.getAll().forEach(task -> dao.delete(task.getId())); + dao.filter(ALL_IN_NULL_NAMESPACE).forEach(task -> dao.delete(task.getId())); } private TaskDTO buildTask() { @@ -62,7 +65,7 @@ public void saveTest() { @Test(dependsOnMethods = {"saveTest"}) public void updateTest() { - TaskDTO dto = dao.getAll().get(0); + TaskDTO dto = dao.filter(ALL_IN_NULL_NAMESPACE).get(0); dto.setVersion(dto.getVersion()+1); assertThat(dao.update(dto)).isEqualTo(1); assertThat(dao.get(dto.getId()).getVersion()).isEqualTo(dto.getVersion()); @@ -70,7 +73,7 @@ public void updateTest() { @Test(dependsOnMethods = {"updateTest"}) public void deleteTest() { - List tasks = dao.getAll(); + List tasks = dao.filter(ALL_IN_NULL_NAMESPACE); assertThat(tasks.size()).isGreaterThan(0); TaskDTO taskToDelete = tasks.get(0); dao.delete(taskToDelete.getId()); diff --git a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/DaoFilter.java b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/DaoFilter.java index 13fab70b30..0a66ea7851 100644 --- a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/DaoFilter.java +++ b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/DaoFilter.java @@ -23,6 +23,10 @@ public class DaoFilter { private Long offset; private String orderByKey; private boolean isDesc = false; + + // for instance: Connection.TRANSACTION_READ_UNCOMMITTED - if not set, the transaction isolation should not be set + // NOTE: this parameter is not supported by all DAOs yet + private Integer transactionIsolationLevel = null; public Predicate getPredicate() { return predicate; @@ -78,4 +82,13 @@ public DaoFilter setDesc(final boolean desc) { isDesc = desc; return this; } + + public Integer getTransactionIsolationLevel() { + return transactionIsolationLevel; + } + + public DaoFilter setTransactionIsolationLevel(final Integer transactionIsolationLevel) { + this.transactionIsolationLevel = transactionIsolationLevel; + return this; + } } diff --git a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/bao/TaskManager.java b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/bao/TaskManager.java index 706199481b..9ad5ef1a48 100644 --- a/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/bao/TaskManager.java +++ b/thirdeye-spi/src/main/java/ai/startree/thirdeye/spi/datalayer/bao/TaskManager.java @@ -56,6 +56,4 @@ void updateStatusAndTaskEndTime(Long id, TaskStatus oldStatus, TaskStatus newSta void purge(Duration expiryDuration, Integer limitOptional); void cleanupOrphanTasks(Timestamp activeThreshold); - - long countByStatus(final TaskStatus status); }