diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index a56129850..8da9c1e4c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -45,6 +45,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase import java.net.URLEncoder import java.time.Instant import java.time.ZonedDateTime @@ -53,6 +54,7 @@ import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit.DAYS import java.time.temporal.ChronoUnit.MILLIS import java.time.temporal.ChronoUnit.MINUTES +import java.util.concurrent.TimeUnit import kotlin.collections.HashMap class MonitorRunnerServiceIT : AlertingRestTestCase() { @@ -137,7 +139,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { verifyAlert(firstRunAlert, monitor) // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to // see lastNotificationTime change. - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) executeMonitor(monitor.id) val secondRunAlert = searchAlerts(monitor).single() verifyAlert(secondRunAlert, monitor) @@ -239,7 +243,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to // let lastNotificationTime change. W/o this sleep the test can result in a false negative. - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) val response = executeMonitor(monitor.id) val output = entityAsMap(response) @@ -739,7 +745,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { verifyAlert(activeAlert1.single(), monitor, ACTIVE) val actionResults1 = verifyActionExecutionResultInAlert(activeAlert1[0], mutableMapOf(Pair(actionThrottleEnabled.id, 0))) - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id)) executeMonitor(monitor.id) val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single() @@ -1372,7 +1380,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to // let lastNotificationTime change. W/o this sleep the test can result in a false negative. - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) executeMonitor(monitor.id) // Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was @@ -1392,7 +1402,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { ) // Execute Monitor and check that both Alerts were updated - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) executeMonitor(monitor.id) currentAlerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN) val completedAlerts = currentAlerts.filter { it.state == COMPLETED } @@ -1914,7 +1926,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to // let Action executionTime change. W/o this sleep the test can result in a false negative. - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) val monitorRunResultThrottled = entityAsMap(executeMonitor(monitor.id)) verifyActionThrottleResultsForBucketLevelMonitor( monitorRunResult = monitorRunResultThrottled, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index b058da877..3bc702616 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -140,7 +140,9 @@ class AlertIndicesIT : AlertingRestTestCase() { executeMonitor(trueMonitor) // Allow for a rollover index. - Thread.sleep(2000) + OpenSearchTestCase.waitUntil({ + return@waitUntil (getAlertIndices().size >= 3) + }, 2, TimeUnit.SECONDS) assertTrue("Did not find 3 alert indices", getAlertIndices().size >= 3) } @@ -157,7 +159,9 @@ class AlertIndicesIT : AlertingRestTestCase() { executeMonitor(trueMonitor.id) // Allow for a rollover index. - Thread.sleep(2000) + OpenSearchTestCase.waitUntil({ + return@waitUntil (getFindingIndices().size >= 2) + }, 2, TimeUnit.SECONDS) assertTrue("Did not find 2 alert indices", getFindingIndices().size >= 2) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt index 937be869d..fcf11af57 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt @@ -16,6 +16,8 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.core.rest.RestStatus import org.opensearch.index.query.QueryBuilders import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase +import java.util.concurrent.TimeUnit class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { @@ -69,8 +71,21 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { // the test execution by a lot (might have to wait for Job Scheduler plugin integration first) // Waiting a minute to ensure the Monitor ran again at least once before checking if the job is running // on time - Thread.sleep(60000) - verifyMonitorStats("/_plugins/_alerting") + var passed = false + OpenSearchTestCase.waitUntil({ + try { + // Run verifyMonitorStats until all assertion test passes + verifyMonitorStats("/_plugins/_alerting") + passed = true + return@waitUntil true + } catch (e: AssertionError) { + return@waitUntil false + } + }, 1, TimeUnit.MINUTES) + if (!passed) { + // if it hit the max time (1 minute), run verifyMonitorStats again to make sure all the tests pass + verifyMonitorStats("/_plugins/_alerting") + } } } break diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index 5450081c7..d019be5ec 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -820,7 +820,11 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Delete request not successful", RestStatus.OK, deleteResponse.restStatus()) // Wait 5 seconds for event to be processed and alerts moved - Thread.sleep(5000) + OpenSearchTestCase.waitUntil({ + val alerts = searchAlerts(monitor) + val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX) + return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1) + }, 5, TimeUnit.SECONDS) val alerts = searchAlerts(monitor) assertEquals("Active alert was not deleted", 0, alerts.size) @@ -851,7 +855,9 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus()) // Wait 5 seconds for event to be processed and alerts moved - Thread.sleep(5000) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 5, TimeUnit.SECONDS) val alerts = searchAlerts(monitor) assertEquals("Active alert was not deleted", 0, alerts.size) @@ -881,7 +887,11 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus()) // Wait 5 seconds for event to be processed and alerts moved - Thread.sleep(5000) + OpenSearchTestCase.waitUntil({ + val alerts = searchAlerts(monitor) + val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX) + return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1) + }, 5, TimeUnit.SECONDS) val alerts = searchAlerts(monitor) assertEquals("Active alert was not deleted", 0, alerts.size) @@ -1002,7 +1012,9 @@ class MonitorRestApiIT : AlertingRestTestCase() { enableScheduledJob() // Sleep briefly so sweep can reschedule the Monitor - Thread.sleep(2000) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 2, TimeUnit.SECONDS) alertingStats = getAlertingStats() assertAlertingStatsSweeperEnabled(alertingStats, true) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index 9cd2c5e26..cf671e10e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -41,6 +41,7 @@ import java.time.temporal.ChronoUnit import java.util.Collections import java.util.Locale import java.util.UUID +import java.util.concurrent.TimeUnit @TestLogging("level:DEBUG", reason = "Debug for tests.") @Suppress("UNCHECKED_CAST") @@ -1180,7 +1181,10 @@ class WorkflowRestApiIT : AlertingRestTestCase() { }""" indexDoc(index, "1", testDoc) - Thread.sleep(80000) + OpenSearchTestCase.waitUntil({ + val findings = searchFindings(monitor.copy(id = monitorResponse.id)) + return@waitUntil (findings.size == 1) + }, 80, TimeUnit.SECONDS) val findings = searchFindings(monitor.copy(id = monitorResponse.id)) assertEquals("Findings saved for test monitor", 1, findings.size) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt index f9c40e465..903eedb44 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt @@ -18,8 +18,10 @@ import org.opensearch.alerting.util.DestinationType import org.opensearch.client.ResponseException import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.core.rest.RestStatus +import org.opensearch.test.OpenSearchTestCase import java.time.Instant import java.util.UUID +import java.util.concurrent.TimeUnit class DestinationMigrationUtilServiceIT : AlertingRestTestCase() { @@ -80,7 +82,9 @@ class DestinationMigrationUtilServiceIT : AlertingRestTestCase() { // Create cluster change event and wait for migration service to complete migrating data over client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb") - Thread.sleep(120000) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 2, TimeUnit.MINUTES) for (id in ids) { val response = client().makeRequest(