Skip to content

Commit

Permalink
Merge branch 'develop' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
net-cscience-raphael committed Nov 28, 2023
2 parents 4c67f53 + f7ef1d6 commit 35ec3b1
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import org.vitrivr.engine.core.config.pipeline.Pipeline
import org.vitrivr.engine.core.config.pipeline.PipelineBuilder
import org.vitrivr.engine.core.model.metamodel.Schema
import org.vitrivr.engine.core.operators.Operator
import org.vitrivr.engine.core.operators.ingest.AbstractSegmenter
import org.vitrivr.engine.core.operators.ingest.Extractor
import org.vitrivr.engine.core.config.pipeline.Pipeline
import org.vitrivr.engine.core.config.pipeline.PipelineBuilder
import java.util.UUID
import java.util.concurrent.BlockingQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingDeque


private val logger: KLogger = KotlinLogging.logger {}

Expand All @@ -20,35 +25,72 @@ private val logger: KLogger = KotlinLogging.logger {}
* @author Ralph Gasser
* @version 1.0.0
*/
class ExecutionServer {
class ExecutionServer private constructor(schema: Schema){

companion object {

@Volatile private var instances: MutableMap<Schema, ExecutionServer> = mutableMapOf()

fun getInstance(schema: Schema) =
instances[schema] ?: synchronized(this) { // synchronized to avoid concurrency problem
instances[schema] ?: ExecutionServer(schema).also { instances[schema] = it }
}
}


/** The [ExecutorService] used to execution [] */
private val executor: ExecutorService = Executors.newCachedThreadPool()

/** The [CoroutineDispatcher] used for execution. */
private val dispatcher: CoroutineDispatcher = this.executor.asCoroutineDispatcher()

private lateinit var operators: List<Operator<*>>
var indexJobQueue: BlockingQueue<Pair<Pipeline,UUID>> = LinkedBlockingDeque()

init {
this.run()
}

fun isPending(uuid: UUID): Int {
return this.indexJobQueue.indexOf(this.indexJobQueue.find { it.second == uuid })
}

fun enqueueIndexJob(pipeline: Pipeline): UUID {
val uuid = UUID.randomUUID()
return this.enqueueIndexJob(pipeline, uuid)
}

fun enqueueIndexJob(pipeline: Pipeline, uuid: UUID): UUID{
this.indexJobQueue.add(Pair(pipeline, uuid))
return uuid;
}

/**
* Executes an extraction job using a [List] of [Extractor]s.
*
* @param extractors The [List] of [Extractor]s to execute.
*/
fun extract(pipeline: Pipeline) = runBlocking {
private fun extract(pipeline: Pipeline) = runBlocking {
val scope = CoroutineScope(this@ExecutionServer.dispatcher)
val jobs = pipeline.getLeaves().map { e -> scope.launch { e.toFlow(scope).takeWhile { it != AbstractSegmenter.TerminalRetrievable }.collect() } }
jobs.forEach { it.join() }
}


fun addOperatorPipeline(operatorPipeline: PipelineBuilder){
this.operators = operatorPipeline.getPipeline().getLeaves();
}

fun execute() = runBlocking {
val scope = CoroutineScope(this@ExecutionServer.dispatcher)
val jobs = this@ExecutionServer.operators.map { e -> scope.launch { e.toFlow(scope).takeWhile() { it != AbstractSegmenter.TerminalRetrievable }.collect() } }
jobs.forEach { it.join() }
private fun run() {
Thread {
val running = true
while (running) {
val pipeline = indexJobQueue.take()
try {
this.extract(pipeline.first)
logger.debug { "Extraction with pipeline '${pipeline.second}' finished." }
} catch (e: Exception) {
logger.error { "Error while executing extraction job: ${e.message}" }
}
// wait
Thread.sleep(10000)
}
}.start()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import org.vitrivr.engine.core.config.IndexConfig
import org.vitrivr.engine.core.config.pipeline.Pipeline
import org.vitrivr.engine.core.config.pipeline.PipelineBuilder
import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer
import org.vitrivr.engine.core.context.IndexContext
import org.vitrivr.engine.core.context.QueryContext
import org.vitrivr.engine.core.database.Connection
Expand Down Expand Up @@ -42,6 +43,9 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab
/** The [List] of [Exporter]s contained in this [Schema]. */
private val exporters: MutableList<Schema.Exporter> = mutableListOf()

/** The [List] of [Pipeline]s contained in this [Schema]. */
private val executionServer: ExecutionServer = ExecutionServer.getInstance(this)

/** The [List] of [Pipeline]s contained in this [Schema]. */
private val extractionPipelines: MutableMap<String, PipelineBuilder> = mutableMapOf()

Expand Down Expand Up @@ -117,6 +121,8 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab
fun getPipelineBuilder(key: String): PipelineBuilder = this.extractionPipelines[key]
?: throw IllegalArgumentException("No pipeline with key '$key' found in schema '$name'.")

fun getExecutionServer(): ExecutionServer = this.executionServer

/**
* Closes this [Schema] and the associated database [Connection].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ fun main(args: Array<String>) {
}

/* Initialize retrieval runtime. */
val executionServer = ExecutionServer()
val runtime = RetrievalRuntime()

/* Prepare Javalin endpoint. */
Expand Down Expand Up @@ -85,7 +84,7 @@ fun main(args: Array<String>) {
/* Prepare CLI endpoint. */
val cli = Cli(manager)
for (schema in manager.listSchemas()) {
cli.register(SchemaCommand(schema, executionServer))
cli.register(SchemaCommand(schema, schema.getExecutionServer()))
}

/* Start the Javalin and CLI. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class SchemaCommand(private val schema: Schema, private val server: ExecutionSer
val config = IndexConfig.read(Paths.get(IndexConfig.DEFAULT_PIPELINE_PATH)) ?: return
val pipelineBuilder = PipelineBuilder.forConfig(this.schema, config)
val pipeline = pipelineBuilder.getPipeline()
this.server.extract(pipeline)
schema.getExecutionServer().enqueueIndexJob(pipeline)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ package org.vitrivr.engine.server.api.rest
import io.javalin.apibuilder.ApiBuilder.*
import org.vitrivr.engine.core.model.metamodel.SchemaManager
import org.vitrivr.engine.query.execution.RetrievalRuntime
import org.vitrivr.engine.server.api.rest.handlers.executeIngest
import org.vitrivr.engine.server.api.rest.handlers.executeQuery
import org.vitrivr.engine.server.api.rest.handlers.fetchExportData
import org.vitrivr.engine.server.api.rest.handlers.listSchemas
import org.vitrivr.engine.server.api.rest.handlers.*
import org.vitrivr.engine.server.config.ApiConfig


Expand Down Expand Up @@ -36,6 +33,9 @@ fun configureApiRoutes(config: ApiConfig, manager: SchemaManager, retrievalRunti
path(schema.name) {
if (config.index) {
post("index") { ctx -> executeIngest(ctx, schema) }
path("index") {
get("{id}") { ctx -> executeIngestStatus(ctx, schema) }
}
}

if (config.retrieval) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package org.vitrivr.engine.server.api.rest.handlers

import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import io.javalin.http.Context
import io.javalin.openapi.*
import io.javalin.util.FileUtil
import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer
import org.vitrivr.engine.core.model.metamodel.Schema
import org.vitrivr.engine.server.api.rest.model.ErrorStatus
import org.vitrivr.engine.server.api.rest.model.ErrorStatusException
import java.nio.file.Path
import java.util.*
import kotlin.io.path.deleteIfExists


private val logger: KLogger = KotlinLogging.logger {}

/**
*
* @author Ralph Gasser
* @author Raphael
* @version 1.0
*/
@OpenApi(
Expand Down Expand Up @@ -42,8 +48,9 @@ fun executeIngest(ctx: Context, schema: Schema) {
}
val filestream: MutableList<Path> = mutableListOf()
// folder with threadId to avoid deleting files from other threads
val threadId = Thread.currentThread().hashCode().toString() + Thread.currentThread().id.toString()
val basepath = Path.of("upload/$threadId/")
val uuid = UUID.randomUUID();
val basepath = Path.of("upload/$uuid/")

try {
ctx.uploadedFiles("data").forEach { uploadedFile ->
val path = Path.of("$basepath/${uploadedFile.filename()}")
Expand All @@ -54,13 +61,55 @@ fun executeIngest(ctx: Context, schema: Schema) {
val pipelineBuilder = pipelineName?.let { schema.getPipelineBuilder(it) }
?: throw ErrorStatusException(400, "Invalid request: Pipeline '$pipelineName' does not exist.")
val pipeline = pipelineBuilder.getApiPipeline(stream)
val server = ExecutionServer().extract(pipeline)

schema.getExecutionServer().enqueueIndexJob(pipeline, uuid)
ctx.json(mapOf("id" to uuid))

} catch (e: Exception) {
throw ErrorStatusException(400, "Invalid request: ${e.message}")
} finally {
filestream.forEach() {
file -> file.deleteIfExists()
filestream.forEach() { file ->
file.deleteIfExists()
}
basepath.deleteIfExists()
}
}

/**
*
* @author Raphael
* @version 1.0
*/
@OpenApi(
path = "/api/{schema}/index/{id}",
methods = [HttpMethod.GET],
summary = "Indexes an item, adding it to the defined schema.",
operationId = "postExecuteIngest",
tags = ["Ingest"],
pathParams = [
OpenApiParam(
"schema",
type = String::class,
description = "The name of the schema to execute a query for.",
required = true
), OpenApiParam(
"id",
type = String::class,
description = "The id querying the state.",
required = true
)
],
responses = [
OpenApiResponse("200", [OpenApiContent(Any::class)]),
OpenApiResponse("400", [OpenApiContent(ErrorStatus::class)])
]
)
fun executeIngestStatus(ctx: Context, schema: Schema) {
val id = try {
UUID.fromString(ctx.pathParam("id"))
} catch (e: Exception) {
throw ErrorStatusException(400, "Invalid request: ${e.message}")
}
val status = schema.getExecutionServer().isPending(id)
ctx.json(mapOf("status" to status))
}
8 changes: 4 additions & 4 deletions vitrivr-engine-server/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ Valid values for this attribute are "off", "trace", "debug", "info", "warn", "er
<logger name="org" level="INFO" additivity="true">
</logger>
<!-- For all in current package, overwrites org -->
<logger name="org.vitrivr.engine.base.*" level="INFO" additivity="true">
<logger name="org.vitrivr.engine.base" level="DEBUG" additivity="true">
</logger>
<logger name="org.vitrivr.engine.core.*" level="INFO" additivity="true">
<logger name="org.vitrivr.engine.core" level="DEBUG" additivity="true">
</logger>
<logger name="org.vitrivr.engine.index" level="DEBUG" additivity="true">
</logger>
<logger name="org.vitrivr.engine.query.*" level="INFO" additivity="true">
<logger name="org.vitrivr.engine.query" level="DEBUG" additivity="true">
</logger>
<logger name="org.vitrivr.engine.server.*" level="INFO" additivity="true">
<logger name="org.vitrivr.engine.server" level="DEBUG" additivity="true">
</logger>
<!-- Root is fall back config if no matching logger is found -->
<Root level="TRACE">
Expand Down

0 comments on commit 35ec3b1

Please sign in to comment.