diff --git a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt index 13618242..99846e8b 100644 --- a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt +++ b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt @@ -5,13 +5,18 @@ import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.takeWhile +import org.vitrivr.engine.core.config.pipeline.Pipeline +import org.vitrivr.engine.core.config.pipeline.PipelineBuilder +import org.vitrivr.engine.core.model.metamodel.Schema import org.vitrivr.engine.core.operators.Operator import org.vitrivr.engine.core.operators.ingest.AbstractSegmenter import org.vitrivr.engine.core.operators.ingest.Extractor -import org.vitrivr.engine.core.config.pipeline.Pipeline -import org.vitrivr.engine.core.config.pipeline.PipelineBuilder +import java.util.UUID +import java.util.concurrent.BlockingQueue import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.LinkedBlockingDeque + private val logger: KLogger = KotlinLogging.logger {} @@ -20,35 +25,72 @@ private val logger: KLogger = KotlinLogging.logger {} * @author Ralph Gasser * @version 1.0.0 */ -class ExecutionServer { +class ExecutionServer private constructor(schema: Schema){ + + companion object { + + @Volatile private var instances: MutableMap = mutableMapOf() + + fun getInstance(schema: Schema) = + instances[schema] ?: synchronized(this) { // synchronized to avoid concurrency problem + instances[schema] ?: ExecutionServer(schema).also { instances[schema] = it } + } + } + + /** The [ExecutorService] used to execution [] */ private val executor: ExecutorService = Executors.newCachedThreadPool() /** The [CoroutineDispatcher] used for execution. */ private val dispatcher: CoroutineDispatcher = this.executor.asCoroutineDispatcher() - private lateinit var operators: List> + var indexJobQueue: BlockingQueue> = LinkedBlockingDeque() + + init { + this.run() + } + + fun isPending(uuid: UUID): Int { + return this.indexJobQueue.indexOf(this.indexJobQueue.find { it.second == uuid }) + } + + fun enqueueIndexJob(pipeline: Pipeline): UUID { + val uuid = UUID.randomUUID() + return this.enqueueIndexJob(pipeline, uuid) + } + + fun enqueueIndexJob(pipeline: Pipeline, uuid: UUID): UUID{ + this.indexJobQueue.add(Pair(pipeline, uuid)) + return uuid; + } /** * Executes an extraction job using a [List] of [Extractor]s. * * @param extractors The [List] of [Extractor]s to execute. */ - fun extract(pipeline: Pipeline) = runBlocking { + private fun extract(pipeline: Pipeline) = runBlocking { val scope = CoroutineScope(this@ExecutionServer.dispatcher) val jobs = pipeline.getLeaves().map { e -> scope.launch { e.toFlow(scope).takeWhile { it != AbstractSegmenter.TerminalRetrievable }.collect() } } jobs.forEach { it.join() } } - fun addOperatorPipeline(operatorPipeline: PipelineBuilder){ - this.operators = operatorPipeline.getPipeline().getLeaves(); - } - - fun execute() = runBlocking { - val scope = CoroutineScope(this@ExecutionServer.dispatcher) - val jobs = this@ExecutionServer.operators.map { e -> scope.launch { e.toFlow(scope).takeWhile() { it != AbstractSegmenter.TerminalRetrievable }.collect() } } - jobs.forEach { it.join() } + private fun run() { + Thread { + val running = true + while (running) { + val pipeline = indexJobQueue.take() + try { + this.extract(pipeline.first) + logger.debug { "Extraction with pipeline '${pipeline.second}' finished." } + } catch (e: Exception) { + logger.error { "Error while executing extraction job: ${e.message}" } + } + // wait + Thread.sleep(10000) + } + }.start() } /** diff --git a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/model/metamodel/Schema.kt b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/model/metamodel/Schema.kt index e99aeae9..c8f1553d 100644 --- a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/model/metamodel/Schema.kt +++ b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/model/metamodel/Schema.kt @@ -5,6 +5,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging import org.vitrivr.engine.core.config.IndexConfig import org.vitrivr.engine.core.config.pipeline.Pipeline import org.vitrivr.engine.core.config.pipeline.PipelineBuilder +import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer import org.vitrivr.engine.core.context.IndexContext import org.vitrivr.engine.core.context.QueryContext import org.vitrivr.engine.core.database.Connection @@ -42,6 +43,9 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab /** The [List] of [Exporter]s contained in this [Schema]. */ private val exporters: MutableList = mutableListOf() + /** The [List] of [Pipeline]s contained in this [Schema]. */ + private val executionServer: ExecutionServer = ExecutionServer.getInstance(this) + /** The [List] of [Pipeline]s contained in this [Schema]. */ private val extractionPipelines: MutableMap = mutableMapOf() @@ -117,6 +121,8 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab fun getPipelineBuilder(key: String): PipelineBuilder = this.extractionPipelines[key] ?: throw IllegalArgumentException("No pipeline with key '$key' found in schema '$name'.") + fun getExecutionServer(): ExecutionServer = this.executionServer + /** * Closes this [Schema] and the associated database [Connection]. */ diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/Main.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/Main.kt index ca421280..a760fc6c 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/Main.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/Main.kt @@ -39,7 +39,6 @@ fun main(args: Array) { } /* Initialize retrieval runtime. */ - val executionServer = ExecutionServer() val runtime = RetrievalRuntime() /* Prepare Javalin endpoint. */ @@ -85,7 +84,7 @@ fun main(args: Array) { /* Prepare CLI endpoint. */ val cli = Cli(manager) for (schema in manager.listSchemas()) { - cli.register(SchemaCommand(schema, executionServer)) + cli.register(SchemaCommand(schema, schema.getExecutionServer())) } /* Start the Javalin and CLI. */ diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/cli/commands/SchemaCommand.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/cli/commands/SchemaCommand.kt index ed275863..0be44406 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/cli/commands/SchemaCommand.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/cli/commands/SchemaCommand.kt @@ -99,7 +99,7 @@ class SchemaCommand(private val schema: Schema, private val server: ExecutionSer val config = IndexConfig.read(Paths.get(IndexConfig.DEFAULT_PIPELINE_PATH)) ?: return val pipelineBuilder = PipelineBuilder.forConfig(this.schema, config) val pipeline = pipelineBuilder.getPipeline() - this.server.extract(pipeline) + schema.getExecutionServer().enqueueIndexJob(pipeline) } } } \ No newline at end of file diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/Routes.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/Routes.kt index cfac3b42..fc6494b3 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/Routes.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/Routes.kt @@ -3,10 +3,7 @@ package org.vitrivr.engine.server.api.rest import io.javalin.apibuilder.ApiBuilder.* import org.vitrivr.engine.core.model.metamodel.SchemaManager import org.vitrivr.engine.query.execution.RetrievalRuntime -import org.vitrivr.engine.server.api.rest.handlers.executeIngest -import org.vitrivr.engine.server.api.rest.handlers.executeQuery -import org.vitrivr.engine.server.api.rest.handlers.fetchExportData -import org.vitrivr.engine.server.api.rest.handlers.listSchemas +import org.vitrivr.engine.server.api.rest.handlers.* import org.vitrivr.engine.server.config.ApiConfig @@ -36,6 +33,9 @@ fun configureApiRoutes(config: ApiConfig, manager: SchemaManager, retrievalRunti path(schema.name) { if (config.index) { post("index") { ctx -> executeIngest(ctx, schema) } + path("index") { + get("{id}") { ctx -> executeIngestStatus(ctx, schema) } + } } if (config.retrieval) { diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt index d04e3a29..8e4d3447 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt @@ -1,18 +1,24 @@ package org.vitrivr.engine.server.api.rest.handlers +import io.github.oshai.kotlinlogging.KLogger +import io.github.oshai.kotlinlogging.KotlinLogging import io.javalin.http.Context import io.javalin.openapi.* import io.javalin.util.FileUtil -import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer import org.vitrivr.engine.core.model.metamodel.Schema import org.vitrivr.engine.server.api.rest.model.ErrorStatus import org.vitrivr.engine.server.api.rest.model.ErrorStatusException import java.nio.file.Path +import java.util.* import kotlin.io.path.deleteIfExists + +private val logger: KLogger = KotlinLogging.logger {} + /** * * @author Ralph Gasser + * @author Raphael * @version 1.0 */ @OpenApi( @@ -42,8 +48,9 @@ fun executeIngest(ctx: Context, schema: Schema) { } val filestream: MutableList = mutableListOf() // folder with threadId to avoid deleting files from other threads - val threadId = Thread.currentThread().hashCode().toString() + Thread.currentThread().id.toString() - val basepath = Path.of("upload/$threadId/") + val uuid = UUID.randomUUID(); + val basepath = Path.of("upload/$uuid/") + try { ctx.uploadedFiles("data").forEach { uploadedFile -> val path = Path.of("$basepath/${uploadedFile.filename()}") @@ -54,13 +61,55 @@ fun executeIngest(ctx: Context, schema: Schema) { val pipelineBuilder = pipelineName?.let { schema.getPipelineBuilder(it) } ?: throw ErrorStatusException(400, "Invalid request: Pipeline '$pipelineName' does not exist.") val pipeline = pipelineBuilder.getApiPipeline(stream) - val server = ExecutionServer().extract(pipeline) + + schema.getExecutionServer().enqueueIndexJob(pipeline, uuid) + ctx.json(mapOf("id" to uuid)) + } catch (e: Exception) { throw ErrorStatusException(400, "Invalid request: ${e.message}") } finally { - filestream.forEach() { - file -> file.deleteIfExists() + filestream.forEach() { file -> + file.deleteIfExists() } basepath.deleteIfExists() } +} + +/** + * + * @author Raphael + * @version 1.0 + */ +@OpenApi( + path = "/api/{schema}/index/{id}", + methods = [HttpMethod.GET], + summary = "Indexes an item, adding it to the defined schema.", + operationId = "postExecuteIngest", + tags = ["Ingest"], + pathParams = [ + OpenApiParam( + "schema", + type = String::class, + description = "The name of the schema to execute a query for.", + required = true + ), OpenApiParam( + "id", + type = String::class, + description = "The id querying the state.", + required = true + ) + ], + responses = [ + OpenApiResponse("200", [OpenApiContent(Any::class)]), + OpenApiResponse("400", [OpenApiContent(ErrorStatus::class)]) + ] +) +fun executeIngestStatus(ctx: Context, schema: Schema) { + val id = try { + UUID.fromString(ctx.pathParam("id")) + } catch (e: Exception) { + throw ErrorStatusException(400, "Invalid request: ${e.message}") + } + val status = schema.getExecutionServer().isPending(id) + ctx.json(mapOf("status" to status)) } \ No newline at end of file diff --git a/vitrivr-engine-server/src/main/resources/log4j2.xml b/vitrivr-engine-server/src/main/resources/log4j2.xml index c63723df..40763013 100644 --- a/vitrivr-engine-server/src/main/resources/log4j2.xml +++ b/vitrivr-engine-server/src/main/resources/log4j2.xml @@ -33,15 +33,15 @@ Valid values for this attribute are "off", "trace", "debug", "info", "warn", "er - + - + - + - +