Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
net-cscience-raphael committed Oct 31, 2024
2 parents d3a4c63 + 8c0398f commit f314526
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class VideoSourceMetadataDescriptor(
val height: Value.Int by this.values

/** The duration of the video source in pixels. */
val duration: Value.Double by this.values
val duration: Value.Long by this.values

/** The number of visual frames per seconds. */
val fps: Value.Double by this.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class PgVectorConnection(provider: PgVectorConnectionProvider, schemaName: Strin
/**
* Returns the human-readable description of this [PgVectorConnection].
*/
override fun description(): String = this.jdbc.toString()
override fun description(): String = this.jdbc.metaData.url

/**
* Closes this [PgVectorConnection]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import java.sql.*
* @author Ralph Gasser
* @version 1.0.0
*/
open class PgDescriptorWriter<D : Descriptor<*>>(final override val field: Schema.Field<*, D>, override val connection: PgVectorConnection) : DescriptorWriter<D> {
open class PgDescriptorWriter<D : Descriptor<*>>(final override val field: Schema.Field<*, D>, override val connection: PgVectorConnection, protected val batchSize: Int = 1000) : DescriptorWriter<D> {

/** The name of the table backing this [PgDescriptorInitializer]. */
protected val tableName: String = "${DESCRIPTOR_ENTITY_PREFIX}_${this.field.fieldName}"

Expand Down Expand Up @@ -56,6 +57,9 @@ open class PgDescriptorWriter<D : Descriptor<*>>(final override val field: Schem
*/
override fun addAll(items: Iterable<D>): Boolean {
try {
this.connection.jdbc.autoCommit = false
var success = true
var batched = 0
this.prepareInsertStatement().use { stmt ->
for (item in items) {
stmt.setObject(1, item.id)
Expand All @@ -70,12 +74,37 @@ open class PgDescriptorWriter<D : Descriptor<*>>(final override val field: Schem
}
}
stmt.addBatch()
batched += 1

/* Execute batch if necessary. */
if (batched % this.batchSize == 0) {
val results = stmt.executeBatch()
batched = 0
stmt.clearBatch()
if (results.any { it != 1 }) {
success = false
break
}
}
}

/* Execute remaining batch and commit. */
if (batched > 0) {
success = stmt.executeBatch().all { it == 1 }
}
if (success) {
this.connection.jdbc.commit()
} else {
this.connection.jdbc.rollback()
}
return stmt.executeBatch().all { it == 1 }
return success
}
} catch (e: SQLException) {
LOGGER.error(e) { "Failed to INSERT descriptors into \"${tableName.lowercase()}\" due to SQL error." }
this.connection.jdbc.rollback()
return false
} finally {
this.connection.jdbc.autoCommit = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.sql.SQLException
* @author Ralph Gasser
* @version 1.0.0
*/
internal class RetrievableWriter(override val connection: PgVectorConnection): RetrievableWriter {
internal class RetrievableWriter(override val connection: PgVectorConnection, private val batchSize: Int = 1000) : RetrievableWriter {
/**
* Adds a new [Retrievable] to the database using this [RetrievableWriter] instance.
*
Expand All @@ -38,17 +38,45 @@ internal class RetrievableWriter(override val connection: PgVectorConnection): R
*/
override fun addAll(items: Iterable<Retrievable>): Boolean {
try {
this.connection.jdbc.autoCommit = false
var success = true
var batched = 0
this.connection.jdbc.prepareStatement("INSERT INTO $RETRIEVABLE_ENTITY_NAME ($RETRIEVABLE_ID_COLUMN_NAME, $RETRIEVABLE_TYPE_COLUMN_NAME) VALUES (?, ?);").use { stmt ->
for (item in items) {
stmt.setObject(1, item.id)
stmt.setString(2, item.type)
stmt.addBatch()
batched += 1

/* Execute batch if necessary. */
if (batched % this.batchSize == 0) {
val results = stmt.executeBatch()
batched = 0
stmt.clearBatch()
if (results.any { it != 1 }) {
success = false
break
}
}
}

/* Execute remaining batch. */
if (batched > 0) {
success = stmt.executeBatch().all { it == 1 }
}
if (success) {
this.connection.jdbc.commit()
} else {
this.connection.jdbc.rollback()
}
return stmt.executeBatch().all { it == 1 }
return success
}
} catch (e: SQLException) {
LOGGER.error(e) { "Failed to persist retrievables due to SQL error." }
this.connection.jdbc.rollback()
return false
} finally {
this.connection.jdbc.autoCommit = true
}
}

Expand Down Expand Up @@ -136,18 +164,46 @@ internal class RetrievableWriter(override val connection: PgVectorConnection): R
*/
override fun connectAll(relationships: Iterable<Relationship>): Boolean {
try {
this.connection.jdbc.autoCommit = false
var success = true
var batched = 0
this.connection.jdbc.prepareStatement("INSERT INTO $RELATIONSHIP_ENTITY_NAME ($OBJECT_ID_COLUMN_NAME,$PREDICATE_COLUMN_NAME,$SUBJECT_ID_COLUMN_NAME) VALUES (?,?,?)").use { stmt ->
for (relationship in relationships) {
stmt.setObject(1, relationship.objectId)
stmt.setString(2, relationship.predicate)
stmt.setObject(3, relationship.subjectId)
stmt.addBatch()
batched += 1

/* Execute batch if necessary. */
if (batched % this.batchSize == 0) {
val results = stmt.executeBatch()
batched = 0
stmt.clearBatch()
if (results.any { it != 1 }) {
success = false
break
}
}
}

/* Execute remaining batch. */
if (batched > 0) {
success = stmt.executeBatch().all { it == 1 }
}
if (success) {
this.connection.jdbc.commit()
} else {
this.connection.jdbc.rollback()
}
return stmt.executeBatch().all { it == 1 }
return success
}
} catch (e: SQLException) {
LOGGER.error(e) { "Failed to insert relationships due to SQL error." }
this.connection.jdbc.rollback()
return false
} finally {
this.connection.jdbc.autoCommit = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,15 @@ class SchemaCommand(private val schema: Schema, private val server: ExecutionSer
/** Migrate retrievables */
val currentRetrievablesReader = this.schema.connection.getRetrievableReader()
val targetRetrievablesWriter = targetSchema.connection.getRetrievableWriter()
targetRetrievablesWriter.addAll(currentRetrievablesReader.getAll().toList())
logger.info { "Migrated ${currentRetrievablesReader.count()} retrievables." }
targetRetrievablesWriter.addAll(currentRetrievablesReader.getAll().asIterable())
logger.info { "Migrated retrievables." }

/** Migrate relationships */
val relations = currentRetrievablesReader.getConnections(emptyList(), emptyList(), emptyList())
.map{ (subjectid, predicate, objectid) ->
Relationship.ById(subjectid, predicate, objectid, false)
}.toList()
val relations = currentRetrievablesReader.getConnections(emptyList(), emptyList(), emptyList()).map { (subjectid, predicate, objectid) ->
Relationship.ById(subjectid, predicate, objectid, false)
}.asIterable()
targetRetrievablesWriter.connectAll(relations)
logger.info { "Migrated ${relations.size} relationships." }
logger.info { "Migrated relationships." }

/** Migrate Fields */
val currentFields = this.schema.fields()
Expand All @@ -221,10 +220,10 @@ class SchemaCommand(private val schema: Schema, private val server: ExecutionSer
zippedFields.forEach{ (currField, tarField) ->
val oldReader = currField.getReader()
val newWriter = tarField.getWriter()
newWriter.addAll(oldReader.getAll().toList())
newWriter.addAll(oldReader.getAll().asIterable())
logger.info { "Migrated field '${currField.fieldName}'." }
}
logger.info{ "Migrated ${currentFields.size} fields." }
logger.info{ "Migration complete."}
logger.info { "Migration complete (${currentFields.size} migrated)." }
}
}
}

0 comments on commit f314526

Please sign in to comment.