Skip to content

Commit

Permalink
The RelationExpander only adds RelationshipAttribute to incoming Retr…
Browse files Browse the repository at this point in the history
…ieved.
  • Loading branch information
Ralph Gasser committed Apr 3, 2024
1 parent 68ed8c6 commit 715f4e1
Showing 1 changed file with 45 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,72 +1,79 @@
package org.vitrivr.engine.query.transform

import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.toList
import org.vitrivr.engine.core.database.retrievable.RetrievableReader
import org.vitrivr.engine.core.model.retrievable.Relationship
import org.vitrivr.engine.core.model.retrievable.RetrievableId
import org.vitrivr.engine.core.model.retrievable.Retrieved
import org.vitrivr.engine.core.model.retrievable.attributes.RelationshipAttribute
import org.vitrivr.engine.core.operators.Operator
import org.vitrivr.engine.core.operators.retrieve.Transformer

/**
* Appends [RelationshipAttribute] to a [Retrieved] by expanding the specified incoming and outgoing relationships.
*
* @version 1.1.0
* @author Luca Rossetto
* @author Ralph Gasser
*/
class RelationExpander(
override val input: Operator<Retrieved>,
private val incomingRelations: List<String>,
private val outgoingRelations: List<String>,
private val retrievableReader: RetrievableReader
) : Transformer {

private val logger: KLogger = KotlinLogging.logger {}

override fun toFlow(scope: CoroutineScope): Flow<Retrieved> = flow {

/* Collect input into list. */
val inputRetrieved = input.toFlow(scope).toList()

/* Fetch relation entries for the provided IDs. */
val ids = inputRetrieved.map { it.id }.toSet()

if (ids.isEmpty()) {
return@flow
val (incoming, outgoing) = if (ids.isEmpty()) {
(if (this@RelationExpander.incomingRelations.isNotEmpty()) {
this@RelationExpander.retrievableReader.getConnections(emptyList(), this@RelationExpander.incomingRelations, ids)
} else {
emptySequence()
}.groupBy { it.first }

to

if (this@RelationExpander.outgoingRelations.isNotEmpty()) {
this@RelationExpander.retrievableReader.getConnections(ids, this@RelationExpander.outgoingRelations, emptyList())
} else {
emptySequence()
}.groupBy { it.third })
} else {
emptyMap<RetrievableId, List<Triple<RetrievableId,String,RetrievableId>>>() to emptyMap()
}

val relations = (if (incomingRelations.isNotEmpty()) {
retrievableReader.getConnections(emptyList(), incomingRelations, ids)
/* Collection IDs that are new and fetch corresponding retrievable. */
val newIds = (incoming.keys + outgoing.keys) - ids
val newRetrievables = if (newIds.isNotEmpty()) {
retrievableReader.getAll(newIds.toList())
} else {
emptySequence()
} + if (outgoingRelations.isNotEmpty()) {
retrievableReader.getConnections(ids, outgoingRelations, emptyList())
} else {
emptySequence()
}).toList()

val newIds = relations.flatMap { listOf(it.first, it.third) } - ids

val newRetrievables = if (newIds.isNotEmpty()) retrievableReader.getAll(newIds.toList()) else emptySequence()

val allRetrieved = (inputRetrieved + newRetrievables.map { Retrieved(it) }).associateBy { it.id }

}.map {
Retrieved(it)
}.associateBy {
it.id
}

for (relation in relations) {
val sub = allRetrieved[relation.first]
val obj = allRetrieved[relation.third]
if (sub == null || obj == null) {
logger.error { "Undefined relationship ${relation}" }
continue
/* Iterate over input and emit each retrievable with expanded relationships. */
inputRetrieved.forEach {
/* Expand incoming relationships. */
for (inc in (incoming[it.id] ?: emptyList())) {
it.addAttribute(RelationshipAttribute(Relationship(inc.first to newRetrievables[inc.first], inc.second, inc.third to newRetrievables[inc.third])))
}

/* Expand outgoing relationships. */
for (out in (outgoing[it.id] ?: emptyList())) {
it.addAttribute(RelationshipAttribute(Relationship(out.third to newRetrievables[out.third], out.second, out.first to newRetrievables[out.first])))
}

val expandedRelation = Relationship(sub, relation.second, obj)
val attribute = RelationshipAttribute(expandedRelation)

sub.addAttribute(attribute)
obj.addAttribute(attribute)
}


allRetrieved.values.forEach {
/* Emit. */
emit(it)
}
}
Expand Down

0 comments on commit 715f4e1

Please sign in to comment.