Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "MessageTesting API filter errors" #17215

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion prime-router/src/main/kotlin/azure/ReportFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class ReportFunction(
override var enrichmentSchemaWarnings: MutableList<String> = mutableListOf(),
override var receiverTransformErrors: MutableList<String> = mutableListOf(),
override var receiverTransformWarnings: MutableList<String> = mutableListOf(),
override var filterErrors: MutableList<ProcessFhirCommands.FilterError> = mutableListOf(),
override var filterErrors: MutableList<String> = mutableListOf(),
) : ProcessFhirCommands.MessageOrBundleParent()

/**
Expand Down
203 changes: 85 additions & 118 deletions prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import gov.cdc.prime.router.Metadata
import gov.cdc.prime.router.MimeFormat
import gov.cdc.prime.router.Receiver
import gov.cdc.prime.router.Report
import gov.cdc.prime.router.ReportStreamFilterType
import gov.cdc.prime.router.ReportStreamFilter
import gov.cdc.prime.router.Topic
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.ConditionStamper
Expand All @@ -41,16 +41,15 @@ import gov.cdc.prime.router.cli.CommandUtilities.Companion.abort
import gov.cdc.prime.router.cli.helpers.HL7DiffHelper
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.config.validation.OrganizationValidation
import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiverFilter
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiverFilter.FhirExpressionEvaluationResult
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiverFilter.ReceiverFilterEvaluationResult
import gov.cdc.prime.router.fhirengine.engine.encodePreserveEncodingChars
import gov.cdc.prime.router.fhirengine.translation.HL7toFhirTranslator
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Context
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Converter
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirTransformer
import gov.cdc.prime.router.fhirengine.translation.hl7.SchemaException
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.FhirPathUtils
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
Expand Down Expand Up @@ -210,7 +209,7 @@ class ProcessFhirCommands : CliktCommand(
(outputFormat == MimeFormat.FHIR.toString()) ||
(receiver != null && receiver.format == MimeFormat.FHIR)
) -> {
val fhirMessage = convertHl7ToFhir(contents).first
val fhirMessage = convertHl7ToFhir(contents, receiver).first
messageOrBundle.bundle = fhirMessage
handleSendAndReceiverFhirEnrichments(messageOrBundle, receiver, senderSchema, isCli)

Expand Down Expand Up @@ -254,7 +253,7 @@ class ProcessFhirCommands : CliktCommand(
(receiver.format == MimeFormat.HL7 || receiver.format == MimeFormat.HL7_BATCH)
)
) -> {
val (bundle2, inputMessage) = convertHl7ToFhir(contents)
val (bundle2, inputMessage) = convertHl7ToFhir(contents, receiver)

messageOrBundle.bundle = bundle2
handleSendAndReceiverFhirEnrichments(messageOrBundle, receiver, senderSchema, isCli)
Expand Down Expand Up @@ -284,7 +283,7 @@ class ProcessFhirCommands : CliktCommand(
senderSchema: String?,
isCli: Boolean,
) {
stampObservations(messageOrBundle)
stampObservations(messageOrBundle, receiver)

val senderSchemaName = when {
senderSchema != null -> senderSchema
Expand All @@ -294,9 +293,7 @@ class ProcessFhirCommands : CliktCommand(

handleSenderTransforms(messageOrBundle, senderSchemaName)

if (receiver != null && messageOrBundle.bundle != null) {
handleReceiverFilters(receiver, messageOrBundle, isCli)
}
evaluateReceiverFilters(receiver, messageOrBundle, isCli)

val receiverEnrichmentSchemaNames = when {
receiver != null && receiver.enrichmentSchemaNames.isNotEmpty() -> {
Expand Down Expand Up @@ -326,128 +323,66 @@ class ProcessFhirCommands : CliktCommand(
}
}

fun handleReceiverFilters(receiver: Receiver, messageOrBundle: MessageOrBundle, isCli: Boolean) {
if (messageOrBundle.bundle?.identifier?.value == null) {
// this is just for logging so it is fine to just make it up
messageOrBundle.bundle?.identifier?.setValue(UUID.randomUUID().toString())
}
// TODO: https://github.com/CDCgov/prime-reportstream/issues/16407
val fhirReceiverFilter = FHIRReceiverFilter(reportStreamEventService = NoopReportStreamEventService())

evaluateReceiverFilters(receiver, messageOrBundle, fhirReceiverFilter)

applyConditionFilter(receiver, messageOrBundle, fhirReceiverFilter)

if (isCli && messageOrBundle.filterErrors.isNotEmpty()) {
val errorMsgLines = messageOrBundle.filterErrors.map { filterError ->
"${filterError.filterType} - ${filterError.message} : \n ${filterError.filter}"
}
throw CliktError(errorMsgLines.joinToString("\n"))
}
}

fun evaluateReceiverFilters(
receiver: Receiver,
messageOrBundle: MessageOrBundle,
fhirReceiverFilter: FHIRReceiverFilter,
) {
val bundle = messageOrBundle.bundle!!
val actionLogger = ActionLogger()

// filter groups for looped evaluation - condition filter evaluated separately
val fhirFilters = listOf(
Pair(receiver.jurisdictionalFilter, ReportStreamFilterType.JURISDICTIONAL_FILTER),
Pair(receiver.qualityFilter, ReportStreamFilterType.QUALITY_FILTER),
Pair(receiver.routingFilter, ReportStreamFilterType.ROUTING_FILTER),
Pair(receiver.processingModeFilter, ReportStreamFilterType.PROCESSING_MODE_FILTER),
)

fhirFilters.forEach {
val filter = it.first
val filterType = it.second
try {
val result = fhirReceiverFilter.evaluateFhirExpressionFilters(
receiver,
bundle,
actionLogger,
bundle.identifier.value,
filter,
filterType
)
if (result is FhirExpressionEvaluationResult.Failure) {
messageOrBundle.filterErrors.add(
FilterError(
result.failingFilter.filters.joinToString("\n"),
"Filter failed",
filterType.toString()
private fun evaluateReceiverFilters(receiver: Receiver?, messageOrBundle: MessageOrBundle, isCli: Boolean) {
if (receiver != null && messageOrBundle.bundle != null) {
val reportStreamFilters = mutableListOf<Pair<String, ReportStreamFilter>>()
reportStreamFilters.add(Pair("Jurisdictional Filter", receiver.jurisdictionalFilter))
reportStreamFilters.add(Pair("Quality Filter", receiver.qualityFilter))
reportStreamFilters.add(Pair("Routing Filter", receiver.routingFilter))
reportStreamFilters.add(Pair("Processing Mode Filter", receiver.processingModeFilter))

reportStreamFilters.forEach { reportStreamFilter ->
reportStreamFilter.second.forEach { filter ->
val validation = OrganizationValidation.validateFilter(filter)
if (!validation) {
messageOrBundle.filterErrors.add(
"Filter of type ${reportStreamFilter.first} is not valid. " +
"Value: '$filter'"
)
)
} else {
val result = FhirPathUtils.evaluate(
CustomContext(
messageOrBundle.bundle!!,
messageOrBundle.bundle!!,
mutableMapOf(),
CustomFhirPathFunctions()
),
messageOrBundle.bundle!!,
messageOrBundle.bundle!!,
filter
)
if (result.isEmpty() ||
(result[0].isBooleanPrimitive && result[0].primitiveValue() == "false")
) {
messageOrBundle.filterErrors.add(
"Filter '$filter' filtered out everything, nothing to return."
)
}
}
}
} catch (e: Exception) {
messageOrBundle.filterErrors.add(
FilterError(
filter.joinToString("\n"),
"Invalid filter - ${e.message}",
filterType.toString()
)
)
}
}
}

fun applyConditionFilter(
receiver: Receiver,
messageOrBundle: MessageOrBundle,
fhirReceiverFilter: FHIRReceiverFilter,
) {
val bundle = messageOrBundle.bundle!!

try {
val result = fhirReceiverFilter.evaluateObservationConditionFilters(
receiver,
bundle,
ActionLogger(),
bundle.identifier.value
)
receiver.conditionFilter.forEach { conditionFilter ->
val validation = OrganizationValidation.validateFilter(conditionFilter)
if (!validation) {
messageOrBundle.filterErrors.add("Condition filter '$conditionFilter' is not valid.")
}
}

if (result is ReceiverFilterEvaluationResult.Success) {
// update the bundle since observations might have gotten pruned after condition filter evaluation
messageOrBundle.bundle = result.bundle
} else {
result as ReceiverFilterEvaluationResult.Failure
messageOrBundle.filterErrors.add(
FilterError(
result.failingFilter.filters.joinToString("\n"),
"Filter failed",
result.failingFilter.filterType.toString()
)
)
if (isCli && messageOrBundle.filterErrors.isNotEmpty()) {
throw CliktError(messageOrBundle.filterErrors.joinToString("\n"))
}
} catch (e: SchemaException) {
messageOrBundle.filterErrors.add(
FilterError(
receiver.conditionFilter.joinToString("\n"),
"Invalid filter - ${e.message}",
ReportStreamFilterType.CONDITION_FILTER.toString()
)
)
}
}

data class FilterError(
val filter: String,
val message: String,
val filterType: String,
)

abstract class MessageOrBundleParent(
open var senderTransformErrors: MutableList<String> = mutableListOf(),
open var senderTransformWarnings: MutableList<String> = mutableListOf(),
open var enrichmentSchemaErrors: MutableList<String> = mutableListOf(),
open var enrichmentSchemaWarnings: MutableList<String> = mutableListOf(),
open var receiverTransformErrors: MutableList<String> = mutableListOf(),
open var receiverTransformWarnings: MutableList<String> = mutableListOf(),
open var filterErrors: MutableList<FilterError> = mutableListOf(),
open var filterErrors: MutableList<String> = mutableListOf(),
)

class MessageOrBundle(
Expand All @@ -459,10 +394,34 @@ class ProcessFhirCommands : CliktCommand(
override var enrichmentSchemaWarnings: MutableList<String> = mutableListOf(),
override var receiverTransformErrors: MutableList<String> = mutableListOf(),
override var receiverTransformWarnings: MutableList<String> = mutableListOf(),
override var filterErrors: MutableList<FilterError> = mutableListOf(),
override var filterErrors: MutableList<String> = mutableListOf(),
) : MessageOrBundleParent()

private fun getReceiver(
private fun applyConditionFilter(receiver: Receiver, bundle: Bundle): Bundle {
val trackingId = if (bundle.id != null) {
bundle.id
} else {
// this is just for logging so it is fine to just make it up
UUID.randomUUID().toString()
}
// TODO: https://github.com/CDCgov/prime-reportstream/issues/16407
val result =
FHIRReceiverFilter(
reportStreamEventService = NoopReportStreamEventService()
).evaluateObservationConditionFilters(
receiver,
bundle,
ActionLogger(),
trackingId
)
if (result is ReceiverFilterEvaluationResult.Success) {
return result.bundle
} else {
throw CliktError("Condition filter failed.")
}
}

fun getReceiver(
environment: Environment,
receiverName: String?,
orgName: String?,
Expand Down Expand Up @@ -548,11 +507,15 @@ class ProcessFhirCommands : CliktCommand(

private fun stampObservations(
messageOrBundle: MessageOrBundle,
receiver: Receiver?,
) {
val stamper = ConditionStamper(LookupTableConditionMapper(Metadata.getInstance()))
messageOrBundle.bundle?.getObservations()?.forEach { observation ->
stamper.stampObservation(observation)
}
if (receiver != null) {
messageOrBundle.bundle = applyConditionFilter(receiver, messageOrBundle.bundle!!)
}
}

/**
Expand All @@ -563,7 +526,7 @@ class ProcessFhirCommands : CliktCommand(
* look like.
* @return a FHIR bundle and the parsed HL7 input that represents the data in the one HL7 message
*/
private fun convertHl7ToFhir(hl7String: String): Pair<Bundle, Message> {
private fun convertHl7ToFhir(hl7String: String, receiver: Receiver?): Pair<Bundle, Message> {
val hasFiveEncodingChars = hl7MessageHasFiveEncodingChars(hl7String)
// Some HL7 2.5.1 implementations have adopted the truncation character # that was added in 2.7
// However, the library used to encode the HL7 message throws an error it there are more than 4 encoding
Expand All @@ -587,6 +550,10 @@ class ProcessFhirCommands : CliktCommand(
stamper.stampObservation(observation)
}

if (receiver != null) {
fhirMessage = applyConditionFilter(receiver, fhirMessage)
}

return Pair(fhirMessage, hl7message)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class FHIRReceiverFilter(
*
* [actionLogger], [trackingId], and [filterType] facilitate logging
*/
fun evaluateFhirExpressionFilters(
private fun evaluateFhirExpressionFilters(
receiver: Receiver,
bundle: Bundle,
actionLogger: ActionLogger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ object FhirPathUtils : Logging {
}
} catch (e: Exception) {
val msg = when (e) {
is FHIRLexer.FHIRLexerException -> "Syntax error: ${e.message} in FHIR Path expression $expression"
is FHIRLexer.FHIRLexerException -> "Syntax error in FHIR Path expression $expression"
is SchemaException -> e.message.toString()
else ->
"Unknown error while evaluating FHIR Path expression $expression for condition. " +
Expand Down
Loading