Skip to content

Commit

Permalink
Mike/6932 universal pipeline for all (#6964)
Browse files Browse the repository at this point in the history
* Send all test events to universal pipeline

* Update tests

* Allow covid only for fhir reporting service
  • Loading branch information
mpbrown authored Nov 17, 2023
1 parent 98fd5fd commit 777b249
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,13 @@ public final class AzureStorageQueueFhirReportingService implements TestEventRep

@Override
public CompletableFuture<Void> reportAsync(TestEvent testEvent) {
if (testEvent.getResults().stream()
.anyMatch(result -> !COVID_LOINC.equals(result.getDisease().getLoinc()))) {
log.trace("Dispatching TestEvent [{}] to Azure storage queue", testEvent.getInternalId());
var parser = context.newJsonParser();
return queueClient
.sendMessage(
parser.encodeResourceToString(
fhirConverter.createFhirBundle(testEvent, gitProperties, processingModeCode)))
.toFuture()
.thenApply(result -> null);
}
return CompletableFuture.completedFuture(null);
log.trace("Dispatching TestEvent [{}] to Azure storage queue", testEvent.getInternalId());
var parser = context.newJsonParser();
return queueClient
.sendMessage(
parser.encodeResourceToString(
fhirConverter.createFhirBundle(testEvent, gitProperties, processingModeCode)))
.toFuture()
.thenApply(result -> null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private void reportTestEventToRS(TestEvent savedEvent) {
testEventReportingService.report(savedEvent);
}

if (savedEvent.hasFluResult() && fhirReportingEnabled) {
if (fhirReportingEnabled) {
fhirQueueReportingService.report(savedEvent);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ void reportAsync_CovidOnly() {
var context = spy(FhirContext.class);
var client = mock(QueueAsyncClient.class);
AzureStorageQueueFhirReportingService service =
new AzureStorageQueueFhirReportingService(context, client, null, fhirConverter);
new AzureStorageQueueFhirReportingService(context, client, gitProperties, fhirConverter);

var multiplexTestEvent = createCovidTestEvent();
ReflectionTestUtils.setField(multiplexTestEvent, "internalId", UUID.randomUUID());
ReflectionTestUtils.setField(multiplexTestEvent.getPatient(), "internalId", UUID.randomUUID());
ReflectionTestUtils.setField(
multiplexTestEvent.getPatient(),
Expand All @@ -90,8 +91,9 @@ void reportAsync_CovidOnly() {
.getResults()
.forEach(result -> ReflectionTestUtils.setField(result, "internalId", UUID.randomUUID()));

when(client.sendMessage(anyString())).thenReturn(Mono.create(MonoSink::success));
service.reportAsync(multiplexTestEvent);
verify(context, times(0)).newJsonParser();
verify(client, times(0)).sendMessage(anyString());
verify(context, times(1)).newJsonParser();
verify(client, times(1)).sendMessage(anyString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void roundTrip() {

// make sure the corrected event is sent to storage queue
verify(testEventReportingService).report(testEventArgumentCaptor.capture());
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService).report(any());
TestEvent sentEvent = testEventArgumentCaptor.getValue();
TestResult testResult = sentEvent.getCovidTestResult().get();
assertThat(sentEvent.getPatient().getInternalId()).isEqualTo(patient.getInternalId());
Expand Down Expand Up @@ -236,7 +236,7 @@ void addTestResult_populateFirstTest() {
null);

verify(testEventReportingService).report(any());
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService).report(any());

List<TestEvent> testEvents =
_testEventRepository.findAllByPatientAndFacilities(p, List.of(facility));
Expand All @@ -257,7 +257,7 @@ void addTestResult_populateFirstTest() {
assertThat(testEvents.get(0).getPatientHasPriorTests()).isFalse();
assertThat(testEvents.get(1).getPatientHasPriorTests()).isTrue();
verify(testEventReportingService, times(2)).report(any());
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService, times(2)).report(any());
}

@Test
Expand Down Expand Up @@ -408,7 +408,7 @@ void addTestResult_orgAdmin_ok() {
List<TestOrder> queue = _service.getQueue(facility.getInternalId());
assertEquals(0, queue.size());
verify(testEventReportingService).report(any());
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService).report(any());
}

@Test
Expand Down Expand Up @@ -454,7 +454,7 @@ void addTestResult_standardUserAllFacilities_ok() {
List<TestOrder> queue = _service.getQueue(facility.getInternalId());
assertEquals(0, queue.size());
verify(testEventReportingService).report(any());
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService).report(any());
}

@Test
Expand Down Expand Up @@ -563,7 +563,7 @@ void addTestResult_standardUser_successDependsOnFacilityAccess() {

// make sure the corrected event is sent to storage queue
verify(testEventReportingService).report(any());
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService).report(any());

List<MultiplexResultInput> negativeCovidResult = makeCovidOnlyResult(TestResult.NEGATIVE);

Expand All @@ -575,7 +575,7 @@ void addTestResult_standardUser_successDependsOnFacilityAccess() {

// make sure the second event is sent to storage queue
verify(testEventReportingService, times(2)).report(any());
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService, times(2)).report(any());
}

@Test
Expand Down Expand Up @@ -934,7 +934,7 @@ void addTestResult_NoTestResultDelivery() {
assertTrue(res.getDeliverySuccess());
verifyNoInteractions(testResultsDeliveryService);
verify(testEventReportingService).report(any());
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService).report(any());
}

@Test
Expand Down Expand Up @@ -1466,7 +1466,7 @@ void markAsErrorTest() {
// make sure the corrected event is sent to storage queue, which gets picked up to be delivered
// to report stream
verify(testEventReportingService).report(deleteMarkerEvent);
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService).report(any());
}

@Test
Expand Down Expand Up @@ -2149,7 +2149,7 @@ void markAsErrorTest_successDependsOnFacilityAccess() {
_service.getTestResult(_e.getInternalId()).getTestOrder();
// make sure the corrected event is sent to storage queue
verify(testEventReportingService).report(correctedTestEvent);
verifyNoInteractions(fhirQueueReportingService);
verify(fhirQueueReportingService).report(any());
}

@Test
Expand Down

0 comments on commit 777b249

Please sign in to comment.