Skip to content

Commit

Permalink
Revert "MessageTesting API filter errors (#17142)"
Browse files Browse the repository at this point in the history
This reverts commit 253f770.
  • Loading branch information
victor-chaparro authored Jan 29, 2025
1 parent 253f770 commit e49b834
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 384 deletions.
2 changes: 1 addition & 1 deletion prime-router/src/main/kotlin/azure/ReportFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class ReportFunction(
override var enrichmentSchemaWarnings: MutableList<String> = mutableListOf(),
override var receiverTransformErrors: MutableList<String> = mutableListOf(),
override var receiverTransformWarnings: MutableList<String> = mutableListOf(),
override var filterErrors: MutableList<ProcessFhirCommands.FilterError> = mutableListOf(),
override var filterErrors: MutableList<String> = mutableListOf(),
) : ProcessFhirCommands.MessageOrBundleParent()

/**
Expand Down
203 changes: 85 additions & 118 deletions prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import gov.cdc.prime.router.Metadata
import gov.cdc.prime.router.MimeFormat
import gov.cdc.prime.router.Receiver
import gov.cdc.prime.router.Report
import gov.cdc.prime.router.ReportStreamFilterType
import gov.cdc.prime.router.ReportStreamFilter
import gov.cdc.prime.router.Topic
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.ConditionStamper
Expand All @@ -41,16 +41,15 @@ import gov.cdc.prime.router.cli.CommandUtilities.Companion.abort
import gov.cdc.prime.router.cli.helpers.HL7DiffHelper
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.config.validation.OrganizationValidation
import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiverFilter
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiverFilter.FhirExpressionEvaluationResult
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiverFilter.ReceiverFilterEvaluationResult
import gov.cdc.prime.router.fhirengine.engine.encodePreserveEncodingChars
import gov.cdc.prime.router.fhirengine.translation.HL7toFhirTranslator
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Context
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Converter
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirTransformer
import gov.cdc.prime.router.fhirengine.translation.hl7.SchemaException
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.FhirPathUtils
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
Expand Down Expand Up @@ -210,7 +209,7 @@ class ProcessFhirCommands : CliktCommand(
(outputFormat == MimeFormat.FHIR.toString()) ||
(receiver != null && receiver.format == MimeFormat.FHIR)
) -> {
val fhirMessage = convertHl7ToFhir(contents).first
val fhirMessage = convertHl7ToFhir(contents, receiver).first
messageOrBundle.bundle = fhirMessage
handleSendAndReceiverFhirEnrichments(messageOrBundle, receiver, senderSchema, isCli)

Expand Down Expand Up @@ -254,7 +253,7 @@ class ProcessFhirCommands : CliktCommand(
(receiver.format == MimeFormat.HL7 || receiver.format == MimeFormat.HL7_BATCH)
)
) -> {
val (bundle2, inputMessage) = convertHl7ToFhir(contents)
val (bundle2, inputMessage) = convertHl7ToFhir(contents, receiver)

messageOrBundle.bundle = bundle2
handleSendAndReceiverFhirEnrichments(messageOrBundle, receiver, senderSchema, isCli)
Expand Down Expand Up @@ -284,7 +283,7 @@ class ProcessFhirCommands : CliktCommand(
senderSchema: String?,
isCli: Boolean,
) {
stampObservations(messageOrBundle)
stampObservations(messageOrBundle, receiver)

val senderSchemaName = when {
senderSchema != null -> senderSchema
Expand All @@ -294,9 +293,7 @@ class ProcessFhirCommands : CliktCommand(

handleSenderTransforms(messageOrBundle, senderSchemaName)

if (receiver != null && messageOrBundle.bundle != null) {
handleReceiverFilters(receiver, messageOrBundle, isCli)
}
evaluateReceiverFilters(receiver, messageOrBundle, isCli)

val receiverEnrichmentSchemaNames = when {
receiver != null && receiver.enrichmentSchemaNames.isNotEmpty() -> {
Expand Down Expand Up @@ -326,128 +323,66 @@ class ProcessFhirCommands : CliktCommand(
}
}

fun handleReceiverFilters(receiver: Receiver, messageOrBundle: MessageOrBundle, isCli: Boolean) {
if (messageOrBundle.bundle?.identifier?.value == null) {
// this is just for logging so it is fine to just make it up
messageOrBundle.bundle?.identifier?.setValue(UUID.randomUUID().toString())
}
// TODO: https://github.com/CDCgov/prime-reportstream/issues/16407
val fhirReceiverFilter = FHIRReceiverFilter(reportStreamEventService = NoopReportStreamEventService())

evaluateReceiverFilters(receiver, messageOrBundle, fhirReceiverFilter)

applyConditionFilter(receiver, messageOrBundle, fhirReceiverFilter)

if (isCli && messageOrBundle.filterErrors.isNotEmpty()) {
val errorMsgLines = messageOrBundle.filterErrors.map { filterError ->
"${filterError.filterType} - ${filterError.message} : \n ${filterError.filter}"
}
throw CliktError(errorMsgLines.joinToString("\n"))
}
}

fun evaluateReceiverFilters(
receiver: Receiver,
messageOrBundle: MessageOrBundle,
fhirReceiverFilter: FHIRReceiverFilter,
) {
val bundle = messageOrBundle.bundle!!
val actionLogger = ActionLogger()

// filter groups for looped evaluation - condition filter evaluated separately
val fhirFilters = listOf(
Pair(receiver.jurisdictionalFilter, ReportStreamFilterType.JURISDICTIONAL_FILTER),
Pair(receiver.qualityFilter, ReportStreamFilterType.QUALITY_FILTER),
Pair(receiver.routingFilter, ReportStreamFilterType.ROUTING_FILTER),
Pair(receiver.processingModeFilter, ReportStreamFilterType.PROCESSING_MODE_FILTER),
)

fhirFilters.forEach {
val filter = it.first
val filterType = it.second
try {
val result = fhirReceiverFilter.evaluateFhirExpressionFilters(
receiver,
bundle,
actionLogger,
bundle.identifier.value,
filter,
filterType
)
if (result is FhirExpressionEvaluationResult.Failure) {
messageOrBundle.filterErrors.add(
FilterError(
result.failingFilter.filters.joinToString("\n"),
"Filter failed",
filterType.toString()
private fun evaluateReceiverFilters(receiver: Receiver?, messageOrBundle: MessageOrBundle, isCli: Boolean) {
if (receiver != null && messageOrBundle.bundle != null) {
val reportStreamFilters = mutableListOf<Pair<String, ReportStreamFilter>>()
reportStreamFilters.add(Pair("Jurisdictional Filter", receiver.jurisdictionalFilter))
reportStreamFilters.add(Pair("Quality Filter", receiver.qualityFilter))
reportStreamFilters.add(Pair("Routing Filter", receiver.routingFilter))
reportStreamFilters.add(Pair("Processing Mode Filter", receiver.processingModeFilter))

reportStreamFilters.forEach { reportStreamFilter ->
reportStreamFilter.second.forEach { filter ->
val validation = OrganizationValidation.validateFilter(filter)
if (!validation) {
messageOrBundle.filterErrors.add(
"Filter of type ${reportStreamFilter.first} is not valid. " +
"Value: '$filter'"
)
)
} else {
val result = FhirPathUtils.evaluate(
CustomContext(
messageOrBundle.bundle!!,
messageOrBundle.bundle!!,
mutableMapOf(),
CustomFhirPathFunctions()
),
messageOrBundle.bundle!!,
messageOrBundle.bundle!!,
filter
)
if (result.isEmpty() ||
(result[0].isBooleanPrimitive && result[0].primitiveValue() == "false")
) {
messageOrBundle.filterErrors.add(
"Filter '$filter' filtered out everything, nothing to return."
)
}
}
}
} catch (e: Exception) {
messageOrBundle.filterErrors.add(
FilterError(
filter.joinToString("\n"),
"Invalid filter - ${e.message}",
filterType.toString()
)
)
}
}
}

fun applyConditionFilter(
receiver: Receiver,
messageOrBundle: MessageOrBundle,
fhirReceiverFilter: FHIRReceiverFilter,
) {
val bundle = messageOrBundle.bundle!!

try {
val result = fhirReceiverFilter.evaluateObservationConditionFilters(
receiver,
bundle,
ActionLogger(),
bundle.identifier.value
)
receiver.conditionFilter.forEach { conditionFilter ->
val validation = OrganizationValidation.validateFilter(conditionFilter)
if (!validation) {
messageOrBundle.filterErrors.add("Condition filter '$conditionFilter' is not valid.")
}
}

if (result is ReceiverFilterEvaluationResult.Success) {
// update the bundle since observations might have gotten pruned after condition filter evaluation
messageOrBundle.bundle = result.bundle
} else {
result as ReceiverFilterEvaluationResult.Failure
messageOrBundle.filterErrors.add(
FilterError(
result.failingFilter.filters.joinToString("\n"),
"Filter failed",
result.failingFilter.filterType.toString()
)
)
if (isCli && messageOrBundle.filterErrors.isNotEmpty()) {
throw CliktError(messageOrBundle.filterErrors.joinToString("\n"))
}
} catch (e: SchemaException) {
messageOrBundle.filterErrors.add(
FilterError(
receiver.conditionFilter.joinToString("\n"),
"Invalid filter - ${e.message}",
ReportStreamFilterType.CONDITION_FILTER.toString()
)
)
}
}

data class FilterError(
val filter: String,
val message: String,
val filterType: String,
)

abstract class MessageOrBundleParent(
open var senderTransformErrors: MutableList<String> = mutableListOf(),
open var senderTransformWarnings: MutableList<String> = mutableListOf(),
open var enrichmentSchemaErrors: MutableList<String> = mutableListOf(),
open var enrichmentSchemaWarnings: MutableList<String> = mutableListOf(),
open var receiverTransformErrors: MutableList<String> = mutableListOf(),
open var receiverTransformWarnings: MutableList<String> = mutableListOf(),
open var filterErrors: MutableList<FilterError> = mutableListOf(),
open var filterErrors: MutableList<String> = mutableListOf(),
)

class MessageOrBundle(
Expand All @@ -459,10 +394,34 @@ class ProcessFhirCommands : CliktCommand(
override var enrichmentSchemaWarnings: MutableList<String> = mutableListOf(),
override var receiverTransformErrors: MutableList<String> = mutableListOf(),
override var receiverTransformWarnings: MutableList<String> = mutableListOf(),
override var filterErrors: MutableList<FilterError> = mutableListOf(),
override var filterErrors: MutableList<String> = mutableListOf(),
) : MessageOrBundleParent()

private fun getReceiver(
private fun applyConditionFilter(receiver: Receiver, bundle: Bundle): Bundle {
val trackingId = if (bundle.id != null) {
bundle.id
} else {
// this is just for logging so it is fine to just make it up
UUID.randomUUID().toString()
}
// TODO: https://github.com/CDCgov/prime-reportstream/issues/16407
val result =
FHIRReceiverFilter(
reportStreamEventService = NoopReportStreamEventService()
).evaluateObservationConditionFilters(
receiver,
bundle,
ActionLogger(),
trackingId
)
if (result is ReceiverFilterEvaluationResult.Success) {
return result.bundle
} else {
throw CliktError("Condition filter failed.")
}
}

fun getReceiver(
environment: Environment,
receiverName: String?,
orgName: String?,
Expand Down Expand Up @@ -548,11 +507,15 @@ class ProcessFhirCommands : CliktCommand(

private fun stampObservations(
messageOrBundle: MessageOrBundle,
receiver: Receiver?,
) {
val stamper = ConditionStamper(LookupTableConditionMapper(Metadata.getInstance()))
messageOrBundle.bundle?.getObservations()?.forEach { observation ->
stamper.stampObservation(observation)
}
if (receiver != null) {
messageOrBundle.bundle = applyConditionFilter(receiver, messageOrBundle.bundle!!)
}
}

/**
Expand All @@ -563,7 +526,7 @@ class ProcessFhirCommands : CliktCommand(
* look like.
* @return a FHIR bundle and the parsed HL7 input that represents the data in the one HL7 message
*/
private fun convertHl7ToFhir(hl7String: String): Pair<Bundle, Message> {
private fun convertHl7ToFhir(hl7String: String, receiver: Receiver?): Pair<Bundle, Message> {
val hasFiveEncodingChars = hl7MessageHasFiveEncodingChars(hl7String)
// Some HL7 2.5.1 implementations have adopted the truncation character # that was added in 2.7
// However, the library used to encode the HL7 message throws an error it there are more than 4 encoding
Expand All @@ -587,6 +550,10 @@ class ProcessFhirCommands : CliktCommand(
stamper.stampObservation(observation)
}

if (receiver != null) {
fhirMessage = applyConditionFilter(receiver, fhirMessage)
}

return Pair(fhirMessage, hl7message)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class FHIRReceiverFilter(
*
* [actionLogger], [trackingId], and [filterType] facilitate logging
*/
fun evaluateFhirExpressionFilters(
private fun evaluateFhirExpressionFilters(
receiver: Receiver,
bundle: Bundle,
actionLogger: ActionLogger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ object FhirPathUtils : Logging {
}
} catch (e: Exception) {
val msg = when (e) {
is FHIRLexer.FHIRLexerException -> "Syntax error: ${e.message} in FHIR Path expression $expression"
is FHIRLexer.FHIRLexerException -> "Syntax error in FHIR Path expression $expression"
is SchemaException -> e.message.toString()
else ->
"Unknown error while evaluating FHIR Path expression $expression for condition. " +
Expand Down
Loading

0 comments on commit e49b834

Please sign in to comment.