Skip to content

Commit

Permalink
optimize execution of workflow consisting of bucket-level followed by…
Browse files Browse the repository at this point in the history
… doc-level monitors (#1729) (#1751)

(cherry picked from commit 3755993)

Signed-off-by: Subhobrata Dey <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent bd82fa1 commit 507391c
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -486,7 +487,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val queryBuilder = if (input.query.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(source.query())
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
sr.source().query(queryBuilder).sort("_seq_no", SortOrder.DESC)
}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() {

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex
val findingIdsForMatchingDocIds = if (workflowRunContext?.findingIds != null) {
workflowRunContext.findingIds
} else {
listOf()
}

val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
Expand Down Expand Up @@ -226,6 +231,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndices,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
findingIdsForMatchingDocIds
)

val shards = mutableSetOf<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class WorkflowService(
* @param chainedMonitors Monitors that have previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String): Map<String, List<String>> {
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String):
Pair<Map<String, List<String>>, List<String>> {
if (chainedMonitors.isEmpty())
return emptyMap()
return Pair(emptyMap(), listOf())
val dataSources = chainedMonitors[0].dataSources
try {
val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil {
exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it)
}
if (existsResponse.isExists == false) return emptyMap()
if (existsResponse.isExists == false) return Pair(emptyMap(), listOf())
// Search findings index to match id of monitors and workflow execution id
val bqb = QueryBuilders.boolQuery()
.filter(
Expand Down Expand Up @@ -83,7 +84,7 @@ class WorkflowService(
for (finding in findings) {
indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds)
}
return indexToRelatedDocIdsMap
return Pair(indexToRelatedDocIdsMap, findings.map { it.id })
} catch (t: Exception) {
log.error("Error getting finding doc ids: ${t.message}", t)
throw AlertingException.wrap(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,37 @@ class TransportDocLevelMonitorFanOutAction
createFindings(monitor, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
/**
* if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger
* generates a single alert with multiple findings.
*/
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
}
} else if (monitor.shouldCreateSingleAlertForFindings == true) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTriggerCreateSingleGroupedAlert(
monitorResult,
it as DocumentLevelTrigger,
monitor,
queryToDocIds,
dryrun,
executionId,
workflowRunContext
)
}
}
}

Expand Down Expand Up @@ -348,6 +366,58 @@ class TransportDocLevelMonitorFanOutAction
}
}

/**
* run doc-level triggers ignoring findings and alerts and generating a single alert.
*/
private suspend fun runForEachDocTriggerCreateSingleGroupedAlert(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
monitor: Monitor,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
executionId: String,
workflowRunContext: WorkflowRunContext?
): DocumentLevelTriggerRunResult {
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds)
if (triggerResult.triggeredDocs.isNotEmpty()) {
val findingIds = if (workflowRunContext?.findingIds != null) {
workflowRunContext.findingIds
} else {
listOf()
}
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val alert = alertService.composeDocLevelAlert(
findingIds!!,
triggerResult.triggeredDocs,
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError(),
executionId = executionId,
workflorwRunContext = workflowRunContext
)
for (action in trigger.actions) {
this.runAction(action, triggerCtx.copy(alerts = listOf(AlertContext(alert))), monitor, dryrun)
}

if (!dryrun && monitor.id != Monitor.NO_ID) {
val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap())
val actionExecutionResults = actionResults.values.map { actionRunResult ->
ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0)
}
val updatedAlert = alert.copy(actionExecutionResults = actionExecutionResults)

retryPolicy.let {
alertService.saveAlerts(
monitor.dataSources,
listOf(updatedAlert),
it,
routingId = monitor.id
)
}
}
}
return DocumentLevelTriggerRunResult(trigger.name, listOf(), monitorResult.error)
}

private suspend fun runForEachDocTrigger(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
Expand Down Expand Up @@ -511,7 +581,11 @@ class TransportDocLevelMonitorFanOutAction
.string()
log.debug("Findings: $findingStr")

if (shouldCreateFinding) {
if (shouldCreateFinding and (
monitor.shouldCreateSingleAlertForFindings == null ||
monitor.shouldCreateSingleAlertForFindings == false
)
) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand All @@ -523,13 +597,15 @@ class TransportDocLevelMonitorFanOutAction
bulkIndexFindings(monitor, indexRequests)
}

try {
findings.forEach { finding ->
publishFinding(monitor, finding)
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
try {
findings.forEach { finding ->
publishFinding(monitor, finding)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
this.findingsToTriggeredQueries += findingsToTriggeredQueries

Expand Down Expand Up @@ -687,6 +763,7 @@ class TransportDocLevelMonitorFanOutAction
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitor,
indexExecutionCtx.concreteIndexName,
shard,
from,
Expand Down Expand Up @@ -869,6 +946,7 @@ class TransportDocLevelMonitorFanOutAction
* This method hence fetches only docs from shard which haven't been queried before
*/
private suspend fun searchShard(
monitor: Monitor,
index: String,
shard: String,
prevSeqNo: Long?,
Expand All @@ -882,8 +960,16 @@ class TransportDocLevelMonitorFanOutAction
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
} else if (monitor.shouldCreateSingleAlertForFindings == true) {
val docIdsParam = mutableListOf<String>()
if (docIds != null) {
docIdsParam.addAll(docIds)
}
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIdsParam))
}

val request: SearchRequest = SearchRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
var lastErrorDelegateRun: Exception? = null

for (delegate in delegates) {
var indexToDocIds = mapOf<String, List<String>>()
var indexToDocIdsWithFindings: Pair<Map<String, List<String>>, List<String>>? = Pair(mapOf(), listOf())
var delegateMonitor: Monitor
delegateMonitor = monitorsById[delegate.monitorId]
?: throw AlertingException.wrap(
Expand All @@ -117,7 +117,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}

try {
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
indexToDocIdsWithFindings = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
} catch (e: Exception) {
logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e)
return WorkflowRunResult(
Expand All @@ -130,9 +130,10 @@ object CompositeWorkflowRunner : WorkflowRunner() {
workflowId = workflowMetadata.workflowId,
workflowMetadataId = workflowMetadata.id,
chainedMonitorId = delegate.chainedMonitorFindings?.monitorId,
matchingDocIdsPerIndex = indexToDocIds,
matchingDocIdsPerIndex = indexToDocIdsWithFindings!!.first,
auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true
else workflow.auditDelegateMonitorAlerts!!
else workflow.auditDelegateMonitorAlerts!!,
findingIds = indexToDocIdsWithFindings.second
)
try {
dataSources = delegateMonitor.dataSources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6235,4 +6235,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
)
)
}

fun `test execute workflow when bucket monitor is used in chained finding of ignored doc monitor`() {
val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field_1").field("test_field_1")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
// Bucket level monitor will reduce the size of matched doc ids on those that belong
// to a bucket that contains more than 1 document after term grouping
val triggerScript = """
params.docCount > 1
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null,
)
)
val bucketCustomAlertsIndex = "custom_alerts_index"
val bucketCustomFindingsIndex = "custom_findings_index"
val bucketCustomFindingsIndexPattern = "custom_findings_index-1"

val bucketLevelMonitorResponse = createMonitor(
randomBucketLevelMonitor(
inputs = listOf(input),
enabled = false,
triggers = listOf(trigger),
dataSources = DataSources(
findingsEnabled = true,
alertsIndex = bucketCustomAlertsIndex,
findingsIndex = bucketCustomFindingsIndex,
findingsIndexPattern = bucketCustomFindingsIndexPattern
)
)
)!!

val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1", fields = listOf())
val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2", fields = listOf())
val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3))
val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val docCustomAlertsIndex = "custom_alerts_index"
val docCustomFindingsIndex = "custom_findings_index"
val docCustomFindingsIndexPattern = "custom_findings_index-1"
var docLevelMonitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(docTrigger),
dataSources = DataSources(
alertsIndex = docCustomAlertsIndex,
findingsIndex = docCustomFindingsIndex,
findingsIndexPattern = docCustomFindingsIndexPattern
),
ignoreFindingsAndAlerts = true
)

val docLevelMonitorResponse = createMonitor(docLevelMonitor)!!
// 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor)
var workflow = randomWorkflow(
monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id),
enabled = false,
auditDelegateMonitorAlerts = false
)
val workflowResponse = upsertWorkflow(workflow)!!
val workflowById = searchWorkflow(workflowResponse.id)
assertNotNull(workflowById)

// Creates 5 documents
insertSampleTimeSerializedData(
index,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2",
"test_value_2",
"test_value_3"
)
)

val workflowId = workflowResponse.id
// 1. bucket level monitor should reduce the doc findings to 4 (1, 2, 3, 4)
// 2. Doc level monitor will match those 4 documents although it contains rules for matching all 5 documents (docQuery3 matches the fifth)
val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
assertNotNull(executeWorkflowResponse)

for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) {
if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) {
val searchResult = monitorRunResults.inputResults.results.first()

@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")
?.get("buckets") as List<kotlin.collections.Map<String, Any>>
assertEquals("Incorrect search result", 3, buckets.size)

val getAlertsResponse = assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2, workflowId)
assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2)
assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4"))
} else {
assertEquals(1, monitorRunResults.inputResults.results.size)
val values = monitorRunResults.triggerResults.values
assertEquals(1, values.size)

val getAlertsResponse = assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 1, workflowId)
assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 1)
assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 0, 0, listOf("1", "2", "3", "4"))
}
}
}
}
Loading

0 comments on commit 507391c

Please sign in to comment.