Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor pipelines to include contentSources #83

Merged
merged 50 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f753b9b
refactor pipeliens
faberf Jul 8, 2024
d237eda
adding textquery
faberf Jul 10, 2024
3a5dec1
retrieval task instructions
faberf Jul 10, 2024
d13422c
Initial draft of ContentMergingTransformer
v0idness Jul 11, 2024
9d2c414
Merge branch 'feature/contentpipelines' of github.com:vitrivr/vitrivr…
v0idness Jul 11, 2024
f19fe95
Extended ContentMergingTransformer to include a template and fill mat…
v0idness Jul 15, 2024
6fe6bbb
added unnormalized fusion and fixed score bug
faberf Jul 17, 2024
7b4540b
Merge branch 'feature/contentpipelines' of github.com:vitrivr/vitrivr…
faberf Jul 17, 2024
62a3f7e
Tested ContentMergingTransformer on sample pipeline; pipeline config …
v0idness Jul 17, 2024
1e9837a
Merge branches 'feature/contentpipelines' and 'feature/contentpipelin…
v0idness Jul 17, 2024
1c83e7e
bidirectional content author maps
faberf Jul 17, 2024
a08ab40
allow image captioning to use content as prompt
faberf Jul 17, 2024
01d0ee3
Updated ContentMergingTransformer to transform content to content rat…
v0idness Jul 18, 2024
f64358c
Refactored parameters to reduce redundancy: content to include taken …
v0idness Jul 18, 2024
54e244a
Merge branch 'feature/contentpipelines' of github.com:vitrivr/vitrivr…
v0idness Jul 18, 2024
93ec49a
Refactor regex to not be applied for each retrievable
v0idness Jul 24, 2024
7f4f1b3
bug fixes for content pipelines
faberf Jul 29, 2024
e186494
Merge branch 'feature/contentpipelines' of github.com:vitrivr/vitrivr…
faberf Jul 29, 2024
2d460aa
Merge remote-tracking branch 'origin/dev' into feature/contentpipelines
faberf Jul 31, 2024
e34f891
simplified passthrough, renamed templatetext
faberf Jul 31, 2024
ffe3805
WIP: committing current changes
faberf Aug 8, 2024
bfff8b7
Merge remote-tracking branch 'origin/dev' into feature/contentpipelines
faberf Aug 8, 2024
7b132bd
finished merge with dev
faberf Aug 8, 2024
f2a10ec
reimplemented batched extraction
faberf Aug 8, 2024
6b36992
WIP on feature/contentpipelines
net-cscience-raphael Aug 12, 2024
d93d940
improves log message
net-cscience-raphael Aug 14, 2024
23522da
Define default string and regex as constant
v0idness Aug 14, 2024
e147e2a
Deleted test schema and pipeline files
v0idness Aug 14, 2024
001d47a
Merge branch 'feature/contentpipelines' of github.com:vitrivr/vitrivr…
net-cscience-raphael Aug 14, 2024
af39709
small refactoring
faberf Aug 14, 2024
c650d1a
Merge branch 'feature/contentpipelines' of github.com:vitrivr/vitrivr…
faberf Aug 14, 2024
9c6238e
Adds filter threshold and topk
net-cscience-raphael Aug 14, 2024
544910a
Merge branch 'feature/contentpipelines' of github.com:vitrivr/vitrivr…
net-cscience-raphael Aug 14, 2024
446b6f1
Merge branch 'feature/contentpipelines' of github.com:vitrivr/vitrivr…
faberf Aug 14, 2024
2191431
Merge branch 'feature/contentpipelines' of github.com:vitrivr/vitrivr…
faberf Aug 14, 2024
e2815f7
adds missing field
net-cscience-raphael Aug 15, 2024
442a38f
adds escaping entity name for pg
net-cscience-raphael Aug 15, 2024
0854d69
Adds retrievableId for persisting
net-cscience-raphael Aug 15, 2024
7ade79b
removes postgres escapeing
net-cscience-raphael Aug 15, 2024
1131708
Escapes and lowercases field names
net-cscience-raphael Aug 15, 2024
8286c85
Merge remote-tracking branch 'origin/dev' into feature/contentpipelines
faberf Aug 16, 2024
29f800b
escapes schema in PGVectorConnection
net-cscience-raphael Aug 16, 2024
7127034
changes credentials for test db
net-cscience-raphael Aug 16, 2024
7b79384
maintains lc pg naming convention
net-cscience-raphael Aug 16, 2024
29b105e
bugfix
net-cscience-raphael Aug 19, 2024
bf8b78f
bugfix
net-cscience-raphael Aug 19, 2024
8516eeb
Merge branch 'dev' into feature/contentpipelines
net-cscience-raphael Aug 19, 2024
2cc9cbe
Merge branch 'dev' into feature/contentpipelines
net-cscience-raphael Aug 19, 2024
5fecf89
debug CI
net-cscience-raphael Aug 19, 2024
3f0076f
adds change due immutable retrievable
net-cscience-raphael Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions test-pipeline.json
v0idness marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
{
"schema": "test",
"context": {
"contentFactory": "CachedContentFactory",
"resolverName": "disk",
"local": {
"content": {
"path": "../cache"
},
"enumerator": {
"path": "../benchmark/media_objects",
"depth": "5"
},
"image_source_filter": {
"type": "SOURCE:IMAGE"
},
"video_source_filter": {
"type": "SOURCE:VIDEO"
},
"ocr_content": {
"field": "ocr_sparse",
"removeContent": "true"
},
"asr_content": {
"field": "asr_sparse",
"removeContent": "true"
},
"caption_content": {
"field": "caption_sparse",
"removeContent": "true"
},
"video_decoder": {
"timeWindowMs": "10000"
},
"ocr_sparse": {
"contentSources": "image_decoder,selector"
},
"caption_sparse": {
"contentSources": "image_decoder,selector"
},
"asr_sparse": {
"contentSources": "video_decoder"
},
"merge_prompt": {
"template": "test $asr_content ASR \n $caption_content CAPTION \n $ocr_content OCR",
"defaultValue": "no content provided"
}
}
},
"operators": {
"passthrough": {
"type": "TRANSFORMER",
"factory": "PassthroughTransformer"
},
"enumerator": {
"type": "ENUMERATOR",
"factory": "FileSystemEnumerator",
"mediaTypes": ["IMAGE", "VIDEO"]
},
"image_decoder": {
"type": "DECODER",
"factory": "ImageDecoder"
},
"video_decoder": {
"type": "DECODER",
"factory": "VideoDecoder"
},
"file_metadata":{
"type": "EXTRACTOR",
"fieldName": "file"
},
"ocr_sparse": {
"type": "EXTRACTOR",
"fieldName": "ocr_sparse"
},
"caption_sparse": {
"type": "EXTRACTOR",
"fieldName": "caption_sparse"
},
"asr_sparse": {
"type": "EXTRACTOR",
"fieldName": "asr_sparse"
},
"ocr_content": {
"type": "TRANSFORMER",
"factory": "DescriptorAsContentTransformer"
},
"asr_content": {
"type": "TRANSFORMER",
"factory": "DescriptorAsContentTransformer"
},
"caption_content": {
"type": "TRANSFORMER",
"factory": "DescriptorAsContentTransformer"
},
"merge_prompt": {
"type": "TRANSFORMER",
"factory": "ContentMergingTransformer"
},
"selector": {
"type": "TRANSFORMER",
"factory": "LastContentAggregator"
},
"time":{
"type": "EXTRACTOR",
"fieldName": "time"
}
},
"operations": {
"enumerator-stage": {"operator": "enumerator"},
"video-decoder-stage": {"operator": "video_decoder", "inputs": ["enumerator-stage"]},
"time-stage": {"operator": "time","inputs": ["video-decoder-stage"]},
"image-decoder-stage": {"operator": "image_decoder", "inputs": ["enumerator-stage"]},
"selector-stage": {"operator": "selector", "inputs": ["time-stage"]},
"video-ocr-sparse-stage": {"operator": "ocr_sparse", "inputs": ["selector-stage"]},
"video-ocr-content-stage": {"operator": "ocr_content", "inputs": ["video-ocr-sparse-stage"]},
"video-caption-sparse-stage": {"operator": "caption_sparse", "inputs": ["selector-stage"]},
"video-caption-content-stage": {"operator": "caption_content", "inputs": ["video-caption-sparse-stage"]},
"asr-sparse-stage": {"operator": "asr_sparse", "inputs": ["time-stage"]},
"asr-content-stage": {"operator": "asr_content", "inputs": ["asr-sparse-stage"]},
"prompt": {"operator": "merge_prompt", "inputs": ["asr-content-stage", "video-caption-content-stage", "video-ocr-content-stage"], "merge": "COMBINE"}
},
"output": [
"prompt"
],
"mergeType": "MERGE"
}
86 changes: 86 additions & 0 deletions test-schema.json
v0idness marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{
"schemas": [
{
"name": "test",
"connection": {
"database": "CottontailConnectionProvider",
"parameters": {
"Host": "127.0.0.1",
"port": "1865"
}
},
"fields": [
{
"name": "averagecolor",
"factory": "AverageColor"
},
{
"name": "file",
"factory": "FileSourceMetadata"
},
{
"name": "time",
"factory": "TemporalMetadata"
},
{
"name": "video",
"factory": "VideoSourceMetadata"
},
{
"name": "asr_sparse",
"factory": "ASR",
"parameters": {
"host": "http://10.34.64.84:8888/",
"model": "whisper",
"timeoutSeconds": "100",
"retries":"1000"
}
},
{
"name": "caption_sparse",
"factory": "ImageCaption",
"parameters": {
"host": "http://10.34.64.84:8888/",
"timeoutSeconds": "100",
"retries":"1000"
}
},
{
"name": "ocr_sparse",
"factory": "OCR",
"parameters": {
"host": "http://10.34.64.84:8888/",
"model": "tesseract",
"timeoutSeconds": "100",
"retries":"1000"
}
}
],
"resolvers": {
"disk": {
"factory": "DiskResolver",
"parameters": {
"location": "../thumbnails"
}
}
},
"exporters": [
{
"name": "thumbnail",
"factory": "ThumbnailExporter",
"resolverName": "disk",
"parameters": {
"maxSideResolution": "400",
"mimeType": "JPG"
}
}
],
"extractionPipelines": [
{
"name": "full",
"path": "./test-pipeline.json"
}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package org.vitrivr.engine.core.config.ingest
import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import org.vitrivr.engine.core.config.IndexContextFactory
import org.vitrivr.engine.core.config.ingest.operation.BaseOperation
import org.vitrivr.engine.core.config.ingest.operation.Operation
import org.vitrivr.engine.core.config.ingest.operation.PassthroughOperation
import org.vitrivr.engine.core.config.ingest.operator.OperatorConfig
import org.vitrivr.engine.core.context.IndexContext
import org.vitrivr.engine.core.model.content.element.ContentElement
Expand Down Expand Up @@ -55,6 +57,7 @@ class IngestionPipelineBuilder(val config: IngestionConfig) {
@Suppress("UNCHECKED_CAST")
fun build(stream: Stream<*>? = null): List<Operator.Sink<Retrievable>> {
return parseOperations().map { root ->

val config = root.opConfig as? OperatorConfig.Enumerator ?: throw IllegalArgumentException("Root stage must always be an enumerator!")
val built = HashMap<String, Operator<*>>()
built[root.name] = buildEnumerator(root.opName, config, stream)
Expand Down Expand Up @@ -82,11 +85,11 @@ class IngestionPipelineBuilder(val config: IngestionConfig) {
/**
* This is an internal function that can be called recursively to build the [Operator] DAG.
*
* @param operation The [Operation] to build.
* @param operation The [IOperation] to build.
faberf marked this conversation as resolved.
Show resolved Hide resolved
* @param memoizationTable The memoization table that holds the already built operators.
* @return The built [Operator].
*/
private fun buildInternal(operation: Operation, memoizationTable: MutableMap<String, Operator<*>>, breakAt: Operation? = null) {
private fun buildInternal(operation: BaseOperation, memoizationTable: MutableMap<String, Operator<*>>, breakAt: BaseOperation? = null) {
/* Find all required input operations and merge them (if necessary). */
if (operation == breakAt) return
val inputs = operation.input.map {
Expand All @@ -107,11 +110,23 @@ class IngestionPipelineBuilder(val config: IngestionConfig) {
}

/* Prepare and cache operator. */
var operator = buildOperator(operation.opName, op, operation.opConfig)
if (operation.output.size > 1) {
operator = BroadcastOperator(operator)
when(operation) {
is Operation -> {
val operator = buildOperator(operation.opName, op, operation.opConfig)
if (operation.output.size > 1) {
memoizationTable[operation.name] = BroadcastOperator(operator)
} else {
memoizationTable[operation.name] = operator
}
}
is PassthroughOperation -> {
if (operation.output.size > 1) {
memoizationTable[operation.name] = BroadcastOperator(op)
} else {
memoizationTable[operation.name] = op
}
}
}
memoizationTable[operation.name] = operator

/* Process output operators. */
for (output in operation.output) {
Expand All @@ -133,17 +148,28 @@ class IngestionPipelineBuilder(val config: IngestionConfig) {

/* Build trees with entry points as roots. */
return entrypoints.map {
val stages = HashMap<String, Operation>()
val root = Operation(it.key, it.value.operator, config.operators[it.value.operator] ?: throw IllegalArgumentException("Undefined operator '${it.value.operator}'"), it.value.merge)
val stages = HashMap<String, BaseOperation>()
val root = Operation(it.key, it.value.operator as String, config.operators[it.value.operator] ?: throw IllegalArgumentException("Undefined operator '${it.value.operator}'"), it.value.merge)
faberf marked this conversation as resolved.
Show resolved Hide resolved
stages[it.key] = root
for (operation in this.config.operations) {
if (!stages.containsKey(operation.key)) {
stages[operation.key] = Operation(operation.key, operation.value.operator, config.operators[operation.value.operator] ?: throw IllegalArgumentException("Undefined operator '${operation.value.operator}'"), operation.value.merge)
when(operation.value.operator) {
is String ->
stages[operation.key] = Operation(
operation.key,
operation.value.operator as String,
faberf marked this conversation as resolved.
Show resolved Hide resolved
config.operators[operation.value.operator as String] ?: throw IllegalArgumentException("Undefined operator '${operation.value.operator}'"),
operation.value.merge
)

null ->
stages[operation.key] = PassthroughOperation(operation.key, operation.value.merge)
}
}
for (inputKey in operation.value.inputs) {
if (!stages.containsKey(inputKey)) {
val op = this.config.operations[inputKey] ?: throw IllegalArgumentException("Undefined operation '${inputKey}'")
stages[inputKey] = Operation(inputKey, op.operator, config.operators[op.operator] ?: throw IllegalArgumentException("Undefined operator '${op.operator}'"), op.merge)
stages[inputKey] = Operation(inputKey, op.operator as String, config.operators[op.operator] ?: throw IllegalArgumentException("Undefined operator '${op.operator}'"), op.merge)
}
stages[operation.key]?.addInput(stages[inputKey]!!)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,62 @@ import org.vitrivr.engine.core.operators.transform.shape.MergeType
import java.util.*

/**
* This [Operation] class represents a single operation in the ingest pipeline.
*
* @author Ralph Gasser
* @version 1.0.0
* This sealed class represents a base operation in the ingest pipeline.
*/
data class Operation(val name: String, val opName: String, val opConfig: OperatorConfig, val merge: MergeType? = null) {
sealed class BaseOperation(val name: String, val merge: MergeType?) {
faberf marked this conversation as resolved.
Show resolved Hide resolved

/** A [LinkedList] of all input [Operation]s. */
private val _input = LinkedList<Operation>()
/** A [LinkedList] of all input [BaseOperation]s. */
private val _input = LinkedList<BaseOperation>()

/** A [LinkedList] of all output [Operation]s. */
private val _output = LinkedList<Operation>()
/** A [LinkedList] of all output [BaseOperation]s. */
private val _output = LinkedList<BaseOperation>()

/** A [List] of all input [Operation]s. */
val input: List<Operation>
/** A [List] of all input [BaseOperation]s. */
val input: List<BaseOperation>
get() = Collections.unmodifiableList(this._input)

/** A [List] of all output [Operation]s. */
val output: List<Operation>
/** A [List] of all output [BaseOperation]s. */
val output: List<BaseOperation>
get() = Collections.unmodifiableList(this._output)

/**
* Adds an input [Operation] to this [Operation].
* Adds an input [BaseOperation] to this [BaseOperation].
*
* @param operation The [Operation] to add.
* @param operation The [BaseOperation] to add.
*/
fun addInput(operation: Operation) {
fun addInput(operation: BaseOperation) {
this._input.add(operation)
operation._output.add(this)
operation.internalAddOutput(this)
}

/**
* Adds an output [Operation] to this [Operation].
* Adds an output [BaseOperation] to this [BaseOperation].
*
* @param operation The [Operation] to add.
* @param operation The [BaseOperation] to add.
*/
fun addOutput(operation: Operation) {
fun addOutput(operation: BaseOperation) {
this._output.add(operation)
operation.internalAddInput(this)
}

protected fun internalAddInput(operation: BaseOperation) {
this._input.add(operation)
}

protected fun internalAddOutput(operation: BaseOperation) {
this._output.add(operation)
operation._input.add(this)
}
}
}

/**
* This [Operation] class represents a single operation in the ingest pipeline.
*
* @param opName The specific operation name.
* @param opConfig The configuration for the operation.
*/
class Operation(name: String, val opName: String, val opConfig: OperatorConfig, merge: MergeType? = null) : BaseOperation(name, merge)

/**
* This [PassthroughOperation] class represents a passthrough operation in the ingest pipeline.
*/
class PassthroughOperation(name: String, merge: MergeType? = null) : BaseOperation(name, merge)
Loading