Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch Individual Geographies Separately from Redis #4866

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions data-browser/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
<appender-ref ref="STDOUT" />
</appender>

<root level="DEBUG">
<root level="INFO">
<appender-ref ref="ASYNC" />
</root>
<logger name="logger.slick.session" level="DEBUG"/>
<logger name="slick.jdbc.DriverDataSource" level="DEBUG"/>
<logger name="slick.basic.BasicBackend.stream" level="DEBUG"/>
<logger name="slick.basic.BasicBackend.action" level="DEBUG"/>
<logger name="logger.slick.session" level="INFO"/>
<logger name="slick.jdbc.DriverDataSource" level="INFO"/>
<logger name="slick.basic.BasicBackend.stream" level="INFO"/>
<logger name="slick.basic.BasicBackend.action" level="INFO"/>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ case class FieldInfo(name: String, value: String)
case class Aggregation(count: Long, sum: Double, fields: List[FieldInfo])

object Aggregation {

private object constants {
val Count = "count"
val Sum = "sum"
}

implicit def sumAggregation(aggregation: Aggregation, secondAggregation: Aggregation): Aggregation = {
Aggregation(aggregation.count + secondAggregation.count, aggregation.sum + secondAggregation.sum, aggregation.fields)
}

// Scala => JSON
implicit val encoder: Encoder[Aggregation] = (agg: Aggregation) => {
val jsonKVs: List[(String, Json)] = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hmda.dataBrowser.services
import akka.NotUsed
import akka.stream.scaladsl.Source
import hmda.dataBrowser.models._
import hmda.dataBrowser.models.Aggregation._
import hmda.dataBrowser.repositories._
import monix.eval.Task
import org.slf4j.Logger
Expand Down Expand Up @@ -79,27 +80,13 @@ class DataBrowserQueryService(repoLatest: ModifiedLarRepositoryLatest, repo2017:

val queryFieldCombinations = permuteQueryFields(hmdaFilters)

println("combinations: " + queryFieldCombinations)
val year = queryFields.year.toInt

Task.parSequenceUnordered {
queryFieldCombinations.map { combination =>
val fieldInfos = combination.map(field => FieldInfo(field.name, field.value))
println("combinations: " + queryFieldCombinations)

// the year is a special case as the data selected depends on the year
val year = queryFields.year.toInt
println("about to get result: " + combination)
cacheResult (
cacheLookup = cache.find(optLEI.getOrElse(QueryField()), geoFilter.getOrElse(QueryField()), combination, year),
onMiss = repo.findAndAggregate(optLEI.getOrElse(QueryField()), geoFilter.getOrElse(QueryField()), combination, year),
cacheUpdate = cache.update(optLEI.getOrElse(QueryField()), geoFilter.getOrElse(QueryField()), combination, year, _: Statistic)
).map { case (from, statistic) => (from, Aggregation(statistic.count, statistic.sum, fieldInfos)) }
}
}.map(results =>
results.foldLeft((ServedFrom.Cache: ServedFrom, List.empty[Aggregation])) {
case ((servedAcc, aggAcc), (nextServed, nextAgg)) =>
(servedAcc.combine(nextServed), nextAgg :: aggAcc)
}
)
if (geoFilter.nonEmpty) {
getMultiGeos(queryFieldCombinations, geoFilter, optLEI, year, repoLatest, cache)
} else getNational(queryFieldCombinations, optLEI, year, repoLatest, cache)
}

override def fetchFilers(queryFields: QueryFields): Task[(ServedFrom, FilerInstitutionResponseLatest)] = {
Expand All @@ -120,4 +107,77 @@ class DataBrowserQueryService(repoLatest: ModifiedLarRepositoryLatest, repo2017:
)
}

def getNational(queryFieldCombinations: List[List[LarQueryField]], optLEI: Option[QueryField], year: Int, repo: ModifiedLarRepositoryLatest, cache: Cache): Task[(ServedFrom, Seq[Aggregation])] = {
Task.parSequenceUnordered {
queryFieldCombinations.map { combination =>
val fieldInfos = combination.map(field => FieldInfo(field.name, field.value))

println("about to get result: " + combination)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

println can be removed

cacheResult (
cacheLookup = cache.find(optLEI.getOrElse(QueryField()),QueryField(), combination, year),
onMiss = repo.findAndAggregate(optLEI.getOrElse(QueryField()), QueryField(), combination, year),
cacheUpdate = cache.update(optLEI.getOrElse(QueryField()), QueryField(), combination, year, _: Statistic)
).map { case (from, statistic) => (from, Aggregation(statistic.count, statistic.sum, fieldInfos)) }
}
}.map(results =>
results.foldLeft((ServedFrom.Cache: ServedFrom, List.empty[Aggregation])) {
case ((servedAcc, aggAcc), (nextServed, nextAgg)) =>
(servedAcc.combine(nextServed), nextAgg :: aggAcc)
}
)
}

def getMultiGeos(queryFieldCombinations: List[List[LarQueryField]], geoFilter: Option[QueryField], optLEI: Option[QueryField], year: Int, repo: ModifiedLarRepositoryLatest, cache: Cache): Task[(ServedFrom, Seq[Aggregation])] = {
val multiGeoCombinationsSeq: Seq[Task[(ServedFrom, List[Aggregation])]] = queryFieldCombinations.map { combination =>
val fieldInfos = combination.map(field => FieldInfo(field.name, field.value))

val geoListTask: Seq[Task[(ServedFrom, Aggregation)]] = geoFilter.getOrElse(QueryField()).values.map { singleGeoCombination =>
val singleGeoFilter = geoFilter.getOrElse(QueryField()).copy(values = Seq(singleGeoCombination))
cacheResult (
cacheLookup = cache.find(optLEI.getOrElse(QueryField()), singleGeoFilter, combination, year),
onMiss = repo.findAndAggregate(optLEI.getOrElse(QueryField()), singleGeoFilter, combination, year),
cacheUpdate = cache.update(optLEI.getOrElse(QueryField()), singleGeoFilter, combination, year, _: Statistic)
).map { case (from, statistic) => (from, Aggregation(statistic.count, statistic.sum, fieldInfos)) }
}

val singleGeoCombinations: Task[(ServedFrom, List[Aggregation])] = {
val singleGeoTaskList: Task[Seq[(ServedFrom, Aggregation)]] = Task.sequence(geoListTask)
singleGeoTaskList.map { results => results.foldLeft((ServedFrom.Cache: ServedFrom, List.empty[Aggregation])) {
case ((servedAcc, aggAcc), (nextServed, nextAgg)) =>
(servedAcc.combine(nextServed), nextAgg :: aggAcc)
}
}
}

singleGeoCombinations
}

val multiGeoCombinationsTask: Task[Seq[(ServedFrom, List[Aggregation])]] = Task.sequence(multiGeoCombinationsSeq)

val combinedAggregation: Task[(ServedFrom, Seq[Aggregation])] = multiGeoCombinationsTask.map { results =>
results.foldLeft((ServedFrom.Cache: ServedFrom, List.empty[Aggregation])) {
case ((servedAcc, aggAcc), (nextServed, nextAgg)) =>
(servedAcc.combine(nextServed), nextAgg ::: aggAcc)
}
}

combinedAggregation.map(result => (result._1, sumAggregations(result._2)))
}

def sumAggregations(initialAgg: Seq[Aggregation]): Seq[Aggregation] = {
var tempMap = Map[List[FieldInfo], Aggregation]()
initialAgg.foreach{ agg =>
val aggOption = tempMap.get(agg.fields)
aggOption match {
case Some(tempAgg) => {
val newAgg = Aggregation(agg.count + tempAgg.count, agg.sum + tempAgg.sum, agg.fields)
tempMap = tempMap + (agg.fields -> newAgg)
}
case None => {
tempMap = tempMap + (agg.fields -> agg)
}
}
}
tempMap.values.toSeq
}
}
Loading