Skip to content

Commit

Permalink
[scheduler] Add core logic to accomodate monthly tasks limit (#1756)
Browse files Browse the repository at this point in the history
* [scheduler] Add core logic to accomodate monthly tasks limit

* Add startDate constraint

* Skip scheduling jobs if quota is already exceeded

* Address comments

* make namespaceQuotaMapRefreshDelay configurable

* Address feedback

* Make cache mutable map and fix NPE

* Address feedback

---------

Co-authored-by: Anshul Singh <[email protected]>
  • Loading branch information
anshul98ks123 and Anshul Singh authored Jan 20, 2025
1 parent 0756d06 commit 713d3b4
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import ai.startree.thirdeye.scheduler.job.DetectionPipelineJob;
import ai.startree.thirdeye.spi.datalayer.bao.AlertManager;
import ai.startree.thirdeye.spi.datalayer.bao.NamespaceConfigurationManager;
import ai.startree.thirdeye.spi.datalayer.bao.TaskManager;
import ai.startree.thirdeye.spi.datalayer.dto.AlertDTO;
import ai.startree.thirdeye.spi.task.TaskType;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand All @@ -39,17 +41,24 @@ public class DetectionCronScheduler {
private final AlertManager alertManager;
private TaskCronSchedulerRunnable<AlertDTO> runnable;

private final TaskManager taskManager;
private final NamespaceConfigurationManager namespaceConfigurationManager;

@Inject
public DetectionCronScheduler(
final AlertManager alertManager,
final ThirdEyeSchedulerConfiguration configuration,
final GuiceJobFactory guiceJobFactory) {
final GuiceJobFactory guiceJobFactory,
final TaskManager taskManager,
final NamespaceConfigurationManager namespaceConfigurationManager) {
this.configuration = configuration;
this.guiceJobFactory = guiceJobFactory;
executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("detection-cron-%d").build());

this.alertManager = alertManager;
this.taskManager = taskManager;
this.namespaceConfigurationManager = namespaceConfigurationManager;
}

public void start() throws SchedulerException {
Expand All @@ -62,7 +71,10 @@ public void start() throws SchedulerException {
DetectionPipelineJob.class,
guiceJobFactory,
DETECTION_SCHEDULER_CRON_MAX_TRIGGERS_PER_MINUTE,
this.getClass()
this.getClass(),
taskManager,
namespaceConfigurationManager,
configuration.getNamespaceQuotaCacheDurationSeconds()
);
executorService.scheduleWithFixedDelay(runnable,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import static ai.startree.thirdeye.spi.util.ExecutorUtils.shutdownExecutionService;

import ai.startree.thirdeye.scheduler.job.NotificationPipelineJob;
import ai.startree.thirdeye.spi.datalayer.bao.NamespaceConfigurationManager;
import ai.startree.thirdeye.spi.datalayer.bao.SubscriptionGroupManager;
import ai.startree.thirdeye.spi.datalayer.bao.TaskManager;
import ai.startree.thirdeye.spi.datalayer.dto.SubscriptionGroupDTO;
import ai.startree.thirdeye.spi.task.TaskType;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand All @@ -43,17 +45,24 @@ public class SubscriptionCronScheduler {
private final SubscriptionGroupManager subscriptionGroupManager;
private TaskCronSchedulerRunnable<SubscriptionGroupDTO> runnable;

private final TaskManager taskManager;
private final NamespaceConfigurationManager namespaceConfigurationManager;

@Inject
public SubscriptionCronScheduler(
final SubscriptionGroupManager subscriptionGroupManager,
final ThirdEyeSchedulerConfiguration configuration,
final GuiceJobFactory guiceJobFactory) {
final GuiceJobFactory guiceJobFactory,
final TaskManager taskManager,
final NamespaceConfigurationManager namespaceConfigurationManager) {
this.configuration = configuration;
this.guiceJobFactory = guiceJobFactory;
executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("subscription-scheduler-%d").build());

this.subscriptionGroupManager = subscriptionGroupManager;
this.taskManager = taskManager;
this.namespaceConfigurationManager = namespaceConfigurationManager;
}

public void start() throws SchedulerException {
Expand All @@ -66,7 +75,10 @@ public void start() throws SchedulerException {
NotificationPipelineJob.class,
guiceJobFactory,
SUBSCRIPTION_SCHEDULER_CRON_MAX_TRIGGERS_PER_MINUTE,
this.getClass()
this.getClass(),
taskManager,
namespaceConfigurationManager,
configuration.getNamespaceQuotaCacheDurationSeconds()
);
executorService.scheduleWithFixedDelay(runnable,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,33 @@
import static ai.startree.thirdeye.scheduler.JobUtils.currentCron;
import static ai.startree.thirdeye.scheduler.JobUtils.getIdFromJobKey;
import static ai.startree.thirdeye.spi.Constants.CRON_TIMEZONE;
import static ai.startree.thirdeye.spi.util.MetricsUtils.scheduledRefreshSupplier;
import static ai.startree.thirdeye.spi.util.TimeUtils.maximumTriggersPerMinute;
import static com.google.common.base.Preconditions.checkArgument;

import ai.startree.thirdeye.spi.datalayer.Predicate;
import ai.startree.thirdeye.spi.datalayer.bao.AbstractManager;
import ai.startree.thirdeye.spi.datalayer.bao.NamespaceConfigurationManager;
import ai.startree.thirdeye.spi.datalayer.bao.TaskManager;
import ai.startree.thirdeye.spi.datalayer.dto.AbstractDTO;
import ai.startree.thirdeye.spi.datalayer.dto.NamespaceConfigurationDTO;
import ai.startree.thirdeye.spi.datalayer.dto.NamespaceQuotasConfigurationDTO;
import ai.startree.thirdeye.spi.task.TaskType;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.quartz.CronScheduleBuilder;
import org.quartz.Job;
import org.quartz.JobBuilder;
Expand All @@ -42,7 +58,10 @@
import org.slf4j.LoggerFactory;

public class TaskCronSchedulerRunnable<E extends AbstractDTO> implements Runnable {


private static final String NULL_NAMESPACE_KEY = "__NULL_NAMESPACE_" +
RandomStringUtils.randomAlphanumeric(10).toUpperCase();

private final Logger log;
private final Scheduler scheduler;
private final TaskType taskType;
Expand All @@ -53,6 +72,9 @@ public class TaskCronSchedulerRunnable<E extends AbstractDTO> implements Runnabl
private final Class<? extends Job> jobClazz;
private final CronGetter<E> cronGetter;
private final isActiveGetter<E> isActiveGetter;
private final TaskManager taskManager;
private final NamespaceConfigurationManager namespaceConfigurationManager;
private final Supplier<Map<String, Boolean>> namespaceToQuotaExceededSupplier;

public TaskCronSchedulerRunnable(
final AbstractManager<E> entityDao,
Expand All @@ -63,7 +85,10 @@ public TaskCronSchedulerRunnable(
final Class<? extends Job> jobClazz,
final GuiceJobFactory guiceJobFactory,
final int cronMaxTriggersPerMinute,
final Class<?> loggerClass) {
final Class<?> loggerClass,
final TaskManager taskManager,
final NamespaceConfigurationManager namespaceConfigurationManager,
final long namespaceQuotaCacheDurationSeconds) {
try {
scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.setJobFactory(guiceJobFactory);
Expand All @@ -80,6 +105,10 @@ public TaskCronSchedulerRunnable(
this.cronMaxTriggersPerMinute = cronMaxTriggersPerMinute;
this.log = LoggerFactory.getLogger(loggerClass);
this.groupMatcher = GroupMatcher.jobGroupEquals(taskType.toString());
this.taskManager = taskManager;
this.namespaceConfigurationManager = namespaceConfigurationManager;
this.namespaceToQuotaExceededSupplier = scheduledRefreshSupplier(
this::getNamespaceToQuotaExceededMap, Duration.ofSeconds(namespaceQuotaCacheDurationSeconds));
}

@Override
Expand All @@ -103,10 +132,14 @@ private void updateSchedules() throws SchedulerException {
// also only fetch only active entities directly and remove is active from Schedulable interface
final List<E> allEntities = entityDao.findAll();

final Map<String, Boolean>
cachedNamespaceToQuotaExceeded = namespaceToQuotaExceededSupplier.get();

// schedule active entities
allEntities.forEach(this::schedule);
allEntities.forEach(e -> schedule(e, cachedNamespaceToQuotaExceeded));

// cleanup schedules of deleted and deactivated entities
// or entities whose workspace has exceeded quotas
final Map<Long, E> idToEntity = allEntities.stream()
.collect(Collectors.toMap(AbstractDTO::getId, e -> e));
final Set<JobKey> scheduledJobKeys = scheduler.getJobKeys(groupMatcher);
Expand All @@ -121,19 +154,69 @@ private void updateSchedules() throws SchedulerException {
} else if (!isActive(entity)) {
log.info("{} with id {} is deactivated. Stopping the scheduled {} job.", entityName, id, taskType);
stopJob(jobKey);
} else if (cachedNamespaceToQuotaExceeded.getOrDefault(nonNullNamespace(entity.namespace()), false)) {
log.info("workspace {} corresponding to {} with id {} has exceeded monthly quota. Stopping scheduled {} job.",
nonNullNamespace(entity.namespace()), entityName, id, taskType);
stopJob(jobKey);
}
} catch (final Exception e) {
log.error("Error removing job key {}", jobKey, e);
}
}
}

private void schedule(final E entity) {
private Map<String, Boolean> getNamespaceToQuotaExceededMap() {
final HashMap<String, Boolean> m = new HashMap<>();
final List<NamespaceConfigurationDTO> namespaceCfgs = namespaceConfigurationManager.findAll();

for (NamespaceConfigurationDTO namespaceCfg : namespaceCfgs) {
final Long monthlyTasksLimit = getMonthlyTasksLimit(namespaceCfg);
if (monthlyTasksLimit == null || monthlyTasksLimit <= 0) {
continue;
}
final String namespace = namespaceCfg.namespace();
final long taskCount = getTasksCountForNamespace(namespaceCfg.namespace());
m.put(nonNullNamespace(namespace), taskCount >= monthlyTasksLimit);
}

return Map.copyOf(m);
}

private Long getMonthlyTasksLimit(final @NonNull NamespaceConfigurationDTO config) {
return Optional.of(config)
.map(NamespaceConfigurationDTO::getNamespaceQuotasConfiguration)
.map(NamespaceQuotasConfigurationDTO::getTaskQuotasConfiguration)
.map(taskQuotasConfig -> switch (taskType) {
case DETECTION -> taskQuotasConfig.getMaximumDetectionTasksPerMonth();
case NOTIFICATION -> taskQuotasConfig.getMaximumNotificationTasksPerMonth();
})
.orElse(null);
}

private long getTasksCountForNamespace(final String namespace) {
final LocalDateTime startOfMonth = LocalDate.now(ZoneOffset.UTC).withDayOfMonth(1).atStartOfDay();
final Predicate predicate = Predicate.AND(
Predicate.EQ("namespace", namespace),
Predicate.EQ("type", taskType),
Predicate.GE("createTime", startOfMonth)
);
return taskManager.count(predicate);
}

private void schedule(final E entity,
final Map<String, Boolean> namespaceToQuotaExceededMap) {
if (!isActive(entity)) {
log.debug("{}: {} is inactive. Skipping.", entityName, entity.getId());
return;
}

final String entityNamespace = nonNullNamespace(entity.namespace());
if (namespaceToQuotaExceededMap.getOrDefault(entityNamespace, false)) {
log.info("workspace {} corresponding to {} with id {} has exceeded monthly quota. Skipping scheduling {} job.",
entityNamespace, entityName, entity.getId(), taskType);
return;
}

// schedule job: add or update job
try {
final String jobName = taskType + "_" + entity.getId();
Expand Down Expand Up @@ -173,7 +256,6 @@ private void stopJob(final JobKey jobKey) throws SchedulerException {
log.info("Stopped {} job {}", taskType, jobKey.getName());
}


private Trigger buildTrigger(final E config) {
final String cron = cronOf(config);
final int maxTriggersPerMinute = maximumTriggersPerMinute(cron);
Expand All @@ -196,6 +278,10 @@ private boolean isActive(final E entity) {
return isActiveGetter.isActive(entity);
}

private static @NonNull String nonNullNamespace(@Nullable String namespace) {
return namespace == null ? NULL_NAMESPACE_KEY : namespace;
}

private String cronOf(final E entity) {
return cronGetter.getCron(entity);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class ThirdEyeSchedulerConfiguration {
private boolean detectionAlert = false;
private boolean dataAvailabilityEventListener = false;
private int alertUpdateDelay = 60;
private int namespaceQuotaCacheDurationSeconds = 300;

// TODO spyne: consolidate all the update delays into a single configuration after consolidating the core scheduler code
private int subscriptionGroupUpdateDelay = 60;
Expand Down Expand Up @@ -116,6 +117,16 @@ public ThirdEyeSchedulerConfiguration setAlertUpdateDelay(final int alertUpdateD
return this;
}

public int getNamespaceQuotaCacheDurationSeconds() {
return namespaceQuotaCacheDurationSeconds;
}

public ThirdEyeSchedulerConfiguration setNamespaceQuotaCacheDurationSeconds(
final int namespaceQuotaCacheDurationSeconds) {
this.namespaceQuotaCacheDurationSeconds = namespaceQuotaCacheDurationSeconds;
return this;
}

public int getSubscriptionGroupUpdateDelay() {
return subscriptionGroupUpdateDelay;
}
Expand Down

0 comments on commit 713d3b4

Please sign in to comment.