Skip to content

Commit

Permalink
backport changes
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Choi <[email protected]>
  • Loading branch information
JacobCho-i committed Nov 11, 2023
1 parent f729cc0 commit 09cbbf6
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder
import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.net.URLEncoder
import java.time.Instant
import java.time.ZonedDateTime
Expand All @@ -53,6 +54,7 @@ import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.DAYS
import java.time.temporal.ChronoUnit.MILLIS
import java.time.temporal.ChronoUnit.MINUTES
import java.util.concurrent.TimeUnit
import kotlin.collections.HashMap

class MonitorRunnerServiceIT : AlertingRestTestCase() {
Expand Down Expand Up @@ -137,7 +139,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
verifyAlert(firstRunAlert, monitor)
// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// see lastNotificationTime change.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)
val secondRunAlert = searchAlerts(monitor).single()
verifyAlert(secondRunAlert, monitor)
Expand Down Expand Up @@ -239,7 +243,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
Expand Down Expand Up @@ -739,7 +745,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
verifyAlert(activeAlert1.single(), monitor, ACTIVE)
val actionResults1 = verifyActionExecutionResultInAlert(activeAlert1[0], mutableMapOf(Pair(actionThrottleEnabled.id, 0)))

Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id))
executeMonitor(monitor.id)
val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single()
Expand Down Expand Up @@ -1372,7 +1380,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)

// Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was
Expand All @@ -1392,7 +1402,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
)

// Execute Monitor and check that both Alerts were updated
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)
currentAlerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN)
val completedAlerts = currentAlerts.filter { it.state == COMPLETED }
Expand Down Expand Up @@ -1914,7 +1926,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let Action executionTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
val monitorRunResultThrottled = entityAsMap(executeMonitor(monitor.id))
verifyActionThrottleResultsForBucketLevelMonitor(
monitorRunResult = monitorRunResultThrottled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor)

// Allow for a rollover index.
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (getAlertIndices().size >= 3)
}, 2, TimeUnit.SECONDS)
assertTrue("Did not find 3 alert indices", getAlertIndices().size >= 3)
}

Expand All @@ -157,7 +159,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor.id)

// Allow for a rollover index.
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (getFindingIndices().size >= 2)
}, 2, TimeUnit.SECONDS)
assertTrue("Did not find 2 alert indices", getFindingIndices().size >= 2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.util.concurrent.TimeUnit

class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -69,8 +71,21 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
// the test execution by a lot (might have to wait for Job Scheduler plugin integration first)
// Waiting a minute to ensure the Monitor ran again at least once before checking if the job is running
// on time
Thread.sleep(60000)
verifyMonitorStats("/_plugins/_alerting")
var passed = false
OpenSearchTestCase.waitUntil({
try {
// Run verifyMonitorStats until all assertion test passes
verifyMonitorStats("/_plugins/_alerting")
passed = true
return@waitUntil true
} catch (e: AssertionError) {
return@waitUntil false
}
}, 1, TimeUnit.MINUTES)
if (!passed) {
// if it hit the max time (1 minute), run verifyMonitorStats again to make sure all the tests pass
verifyMonitorStats("/_plugins/_alerting")
}
}
}
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,11 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Delete request not successful", RestStatus.OK, deleteResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val alerts = searchAlerts(monitor)
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -851,7 +855,9 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -881,7 +887,11 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val alerts = searchAlerts(monitor)
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -1002,7 +1012,9 @@ class MonitorRestApiIT : AlertingRestTestCase() {
enableScheduledJob()

// Sleep briefly so sweep can reschedule the Monitor
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 2, TimeUnit.SECONDS)

alertingStats = getAlertingStats()
assertAlertingStatsSweeperEnabled(alertingStats, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import java.time.temporal.ChronoUnit
import java.util.Collections
import java.util.Locale
import java.util.UUID
import java.util.concurrent.TimeUnit

@TestLogging("level:DEBUG", reason = "Debug for tests.")
@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -1180,7 +1181,10 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
}"""

indexDoc(index, "1", testDoc)
Thread.sleep(80000)
OpenSearchTestCase.waitUntil({
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
return@waitUntil (findings.size == 1)
}, 80, TimeUnit.SECONDS)

val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import org.opensearch.alerting.util.DestinationType
import org.opensearch.client.ResponseException
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit

class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -80,7 +82,9 @@ class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {

// Create cluster change event and wait for migration service to complete migrating data over
client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb")
Thread.sleep(120000)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 2, TimeUnit.MINUTES)

for (id in ids) {
val response = client().makeRequest(
Expand Down

0 comments on commit 09cbbf6

Please sign in to comment.