diff --git a/data-browser/src/main/resources/logback.xml b/data-browser/src/main/resources/logback.xml index f6f9a676fc..b4f2d0162a 100644 --- a/data-browser/src/main/resources/logback.xml +++ b/data-browser/src/main/resources/logback.xml @@ -10,11 +10,11 @@ - + - - - - + + + + \ No newline at end of file diff --git a/data-browser/src/main/scala/hmda/dataBrowser/models/Aggregation.scala b/data-browser/src/main/scala/hmda/dataBrowser/models/Aggregation.scala index 910ab4eb45..726b9cddaa 100644 --- a/data-browser/src/main/scala/hmda/dataBrowser/models/Aggregation.scala +++ b/data-browser/src/main/scala/hmda/dataBrowser/models/Aggregation.scala @@ -9,11 +9,16 @@ case class FieldInfo(name: String, value: String) case class Aggregation(count: Long, sum: Double, fields: List[FieldInfo]) object Aggregation { + private object constants { val Count = "count" val Sum = "sum" } + implicit def sumAggregation(aggregation: Aggregation, secondAggregation: Aggregation): Aggregation = { + Aggregation(aggregation.count + secondAggregation.count, aggregation.sum + secondAggregation.sum, aggregation.fields) + } + // Scala => JSON implicit val encoder: Encoder[Aggregation] = (agg: Aggregation) => { val jsonKVs: List[(String, Json)] = List( diff --git a/data-browser/src/main/scala/hmda/dataBrowser/services/DataBrowserQueryService.scala b/data-browser/src/main/scala/hmda/dataBrowser/services/DataBrowserQueryService.scala index d597991092..4f63c86867 100644 --- a/data-browser/src/main/scala/hmda/dataBrowser/services/DataBrowserQueryService.scala +++ b/data-browser/src/main/scala/hmda/dataBrowser/services/DataBrowserQueryService.scala @@ -2,6 +2,7 @@ package hmda.dataBrowser.services import akka.NotUsed import akka.stream.scaladsl.Source import hmda.dataBrowser.models._ +import hmda.dataBrowser.models.Aggregation._ import hmda.dataBrowser.repositories._ import monix.eval.Task import org.slf4j.Logger @@ -79,27 +80,13 @@ class DataBrowserQueryService(repoLatest: ModifiedLarRepositoryLatest, repo2017: val queryFieldCombinations = permuteQueryFields(hmdaFilters) - println("combinations: " + queryFieldCombinations) + val year = queryFields.year.toInt - Task.parSequenceUnordered { - queryFieldCombinations.map { combination => - val fieldInfos = combination.map(field => FieldInfo(field.name, field.value)) + println("combinations: " + queryFieldCombinations) - // the year is a special case as the data selected depends on the year - val year = queryFields.year.toInt - println("about to get result: " + combination) - cacheResult ( - cacheLookup = cache.find(optLEI.getOrElse(QueryField()), geoFilter.getOrElse(QueryField()), combination, year), - onMiss = repo.findAndAggregate(optLEI.getOrElse(QueryField()), geoFilter.getOrElse(QueryField()), combination, year), - cacheUpdate = cache.update(optLEI.getOrElse(QueryField()), geoFilter.getOrElse(QueryField()), combination, year, _: Statistic) - ).map { case (from, statistic) => (from, Aggregation(statistic.count, statistic.sum, fieldInfos)) } - } - }.map(results => - results.foldLeft((ServedFrom.Cache: ServedFrom, List.empty[Aggregation])) { - case ((servedAcc, aggAcc), (nextServed, nextAgg)) => - (servedAcc.combine(nextServed), nextAgg :: aggAcc) - } - ) + if (geoFilter.nonEmpty) { + getMultiGeos(queryFieldCombinations, geoFilter, optLEI, year, repoLatest, cache) + } else getNational(queryFieldCombinations, optLEI, year, repoLatest, cache) } override def fetchFilers(queryFields: QueryFields): Task[(ServedFrom, FilerInstitutionResponseLatest)] = { @@ -120,4 +107,77 @@ class DataBrowserQueryService(repoLatest: ModifiedLarRepositoryLatest, repo2017: ) } + def getNational(queryFieldCombinations: List[List[LarQueryField]], optLEI: Option[QueryField], year: Int, repo: ModifiedLarRepositoryLatest, cache: Cache): Task[(ServedFrom, Seq[Aggregation])] = { + Task.parSequenceUnordered { + queryFieldCombinations.map { combination => + val fieldInfos = combination.map(field => FieldInfo(field.name, field.value)) + + println("about to get result: " + combination) + cacheResult ( + cacheLookup = cache.find(optLEI.getOrElse(QueryField()),QueryField(), combination, year), + onMiss = repo.findAndAggregate(optLEI.getOrElse(QueryField()), QueryField(), combination, year), + cacheUpdate = cache.update(optLEI.getOrElse(QueryField()), QueryField(), combination, year, _: Statistic) + ).map { case (from, statistic) => (from, Aggregation(statistic.count, statistic.sum, fieldInfos)) } + } + }.map(results => + results.foldLeft((ServedFrom.Cache: ServedFrom, List.empty[Aggregation])) { + case ((servedAcc, aggAcc), (nextServed, nextAgg)) => + (servedAcc.combine(nextServed), nextAgg :: aggAcc) + } + ) + } + + def getMultiGeos(queryFieldCombinations: List[List[LarQueryField]], geoFilter: Option[QueryField], optLEI: Option[QueryField], year: Int, repo: ModifiedLarRepositoryLatest, cache: Cache): Task[(ServedFrom, Seq[Aggregation])] = { + val multiGeoCombinationsSeq: Seq[Task[(ServedFrom, List[Aggregation])]] = queryFieldCombinations.map { combination => + val fieldInfos = combination.map(field => FieldInfo(field.name, field.value)) + + val geoListTask: Seq[Task[(ServedFrom, Aggregation)]] = geoFilter.getOrElse(QueryField()).values.map { singleGeoCombination => + val singleGeoFilter = geoFilter.getOrElse(QueryField()).copy(values = Seq(singleGeoCombination)) + cacheResult ( + cacheLookup = cache.find(optLEI.getOrElse(QueryField()), singleGeoFilter, combination, year), + onMiss = repo.findAndAggregate(optLEI.getOrElse(QueryField()), singleGeoFilter, combination, year), + cacheUpdate = cache.update(optLEI.getOrElse(QueryField()), singleGeoFilter, combination, year, _: Statistic) + ).map { case (from, statistic) => (from, Aggregation(statistic.count, statistic.sum, fieldInfos)) } + } + + val singleGeoCombinations: Task[(ServedFrom, List[Aggregation])] = { + val singleGeoTaskList: Task[Seq[(ServedFrom, Aggregation)]] = Task.sequence(geoListTask) + singleGeoTaskList.map { results => results.foldLeft((ServedFrom.Cache: ServedFrom, List.empty[Aggregation])) { + case ((servedAcc, aggAcc), (nextServed, nextAgg)) => + (servedAcc.combine(nextServed), nextAgg :: aggAcc) + } + } + } + + singleGeoCombinations + } + + val multiGeoCombinationsTask: Task[Seq[(ServedFrom, List[Aggregation])]] = Task.sequence(multiGeoCombinationsSeq) + + val combinedAggregation: Task[(ServedFrom, Seq[Aggregation])] = multiGeoCombinationsTask.map { results => + results.foldLeft((ServedFrom.Cache: ServedFrom, List.empty[Aggregation])) { + case ((servedAcc, aggAcc), (nextServed, nextAgg)) => + (servedAcc.combine(nextServed), nextAgg ::: aggAcc) + } + } + + combinedAggregation.map(result => (result._1, sumAggregations(result._2))) + } + + def sumAggregations(initialAgg: Seq[Aggregation]): Seq[Aggregation] = { + var tempMap = Map[List[FieldInfo], Aggregation]() + initialAgg.foreach{ agg => + val aggOption = tempMap.get(agg.fields) + aggOption match { + case Some(tempAgg) => { + val newAgg = Aggregation(agg.count + tempAgg.count, agg.sum + tempAgg.sum, agg.fields) + tempMap = tempMap + (agg.fields -> newAgg) + } + case None => { + tempMap = tempMap + (agg.fields -> agg) + } + } + } + tempMap.values.toSeq + } } \ No newline at end of file