Skip to content

Commit

Permalink
Addresses some of the review comments made by me.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralph Gasser committed Nov 5, 2024
1 parent 33e3388 commit e94e7f4
Showing 1 changed file with 170 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.channelFlow
import org.bytedeco.javacv.FrameGrabber
import kotlinx.coroutines.runBlocking
import org.vitrivr.engine.core.context.IndexContext
import org.vitrivr.engine.core.model.content.element.AudioContent
import org.vitrivr.engine.core.model.content.element.ImageContent
import org.vitrivr.engine.core.model.relationship.Relationship
import org.vitrivr.engine.core.model.retrievable.Ingested
import org.vitrivr.engine.core.model.retrievable.Retrievable
Expand All @@ -24,29 +26,43 @@ import org.vitrivr.engine.core.operators.ingest.Decoder
import org.vitrivr.engine.core.operators.ingest.DecoderFactory
import org.vitrivr.engine.core.operators.ingest.Enumerator
import org.vitrivr.engine.core.source.MediaType
import org.vitrivr.engine.core.source.Metadata
import org.vitrivr.engine.core.source.Source
import org.vitrivr.engine.core.source.file.FileSource
import java.awt.image.BufferedImage
import java.nio.ShortBuffer
import java.nio.file.Path
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import org.vitrivr.engine.core.source.Metadata

/**
* A [Decoder] that can decode [ImageContent] and [AudioContent] from a [Source] of [MediaType.VIDEO].
*
* Based on Jaffree FFmpeg wrapper, which spawns a new FFmpeg process for each [Source].
*
* @author Luca Rossetto
* @author Ralph Gasser
* @version 1.0.0
*/
class FFmpegVideoDecoder : DecoderFactory {

override fun newDecoder(name: String, input: Enumerator, context: IndexContext): Decoder {
val maxWidth = context[name, "maxWidth"]?.toIntOrNull() ?: 3840
val maxHeight = context[name, "maxHeight"]?.toIntOrNull() ?: 2160
val framerate = context[name, "framerate"]?.toIntOrNull()
val video = context[name, "video"]?.let { it.lowercase() == "true" } ?: true
val audio = context[name, "audio"]?.let { it.lowercase() == "true" } ?: true
val timeWindowMs = context[name, "timeWindowMs"]?.toLongOrNull() ?: 500L
val ffmpegPath = context[name, "ffmpegPath"]?.let { Path.of(it) }

return Instance(input, context, timeWindowMs, maxWidth, maxHeight, framerate, name, ffmpegPath)
return Instance(input, context, video, audio, timeWindowMs, maxWidth, maxHeight, framerate, name, ffmpegPath)
}

private class Instance(
override val input: Enumerator,
private val context: IndexContext,
private val video: Boolean = true,
private val audio: Boolean = true,
private val timeWindowMs: Long = 500L,
private val maxWidth: Int,
private val maxHeight: Int,
Expand Down Expand Up @@ -81,8 +97,8 @@ class FFmpegVideoDecoder : DecoderFactory {
}
}.execute()

/* Extract metadata. */
val videoStreamInfo = probeResult.streams.find { it.codecType == StreamType.VIDEO }

if (videoStreamInfo != null) {
source.metadata[Metadata.METADATA_KEY_VIDEO_FPS] = videoStreamInfo.avgFrameRate.toDouble()
source.metadata[Metadata.METADATA_KEY_AV_DURATION] = (videoStreamInfo.duration * 1000f).toLong()
Expand All @@ -91,140 +107,194 @@ class FFmpegVideoDecoder : DecoderFactory {
}

val audioStreamInfo = probeResult.streams.find { it.codecType == StreamType.AUDIO }

if (audioStreamInfo != null) {
source.metadata[Metadata.METADATA_KEY_AUDIO_CHANNELS] = audioStreamInfo.channels
source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLERATE] = audioStreamInfo.sampleRate
source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLESIZE] = audioStreamInfo.sampleFmt
}

var windowEnd = TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs)

val imageTransferBuffer = LinkedBlockingQueue<Pair<BufferedImage, Long>>(10)

val ffmpegInstance = ffmpeg.addInput(
/* Create consumer. */
val consumer = InFlowFrameConsumer(this, sourceRetrievable)
val ffmpegInstance = this@Instance.ffmpeg.addInput(
if (source is FileSource) {
UrlInput.fromPath(source.path)
} else {
PipeInput.pumpFrom(source.newInputStream())
}
).addOutput(
FrameOutput.withConsumerAlpha(
object : FrameConsumer {
).addOutput(FrameOutput.withConsumerAlpha(consumer))

val streamMap = mutableMapOf<Int, Stream>()
/* Execute. */
ffmpegInstance.execute()

override fun consumeStreams(streams: MutableList<Stream>) {
streams.forEach { stream -> streamMap[stream.id] = stream }
}
/* Emit final frames. */
if (!consumer.isEmpty()) {
consumer.emit()
}

override fun consume(frame: Frame) {
/* Emit source retrievable. */
send(sourceRetrievable)
}
}.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND)

val stream = streamMap[frame.streamId] ?: return

when (stream.type) {
Stream.Type.VIDEO -> {
imageTransferBuffer.put(frame.image!! to (1000000 * frame.pts) / stream.timebase)
}
/**
* A [FrameConsumer] that emits [Retrievable]s to the downstream [channel].
*/
private inner class InFlowFrameConsumer(private val channel: ProducerScope<Retrievable>, val source: Retrievable) : FrameConsumer {

Stream.Type.AUDIO -> {
//TODO
}
/** The video [Stream] processed by this [InFlowFrameConsumer]. */
var videoStream: Stream? = null
private set

null -> {
/* ignore */
}
}
}
/** The audio [Stream] processed by this [InFlowFrameConsumer]. */
var audioStream: Stream? = null
private set

}
)
).setFilter(
StreamType.VIDEO,
"scale=w='min($maxWidth,iw)':h='min($maxHeight,ih)':force_original_aspect_ratio=decrease${if (framerate != null && framerate > 0) ",fps=$framerate" else ""}'"
)
/** The end of the time window. */
var windowEnd = TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs)
private set

//TODO audio settings
/** Flag indicating, that video is ready to be emitted. */
var videoReady = false

val future = ffmpegInstance.executeAsync()
/** Flag indicating, that audio is ready to be emitted. */
var audioReady = false

val localImageBuffer = LinkedList<Pair<BufferedImage, Long>>()
/** [List] of grabbed [BufferedImage]s. */
val imageBuffer: List<Pair<BufferedImage,Long>> = LinkedList()

while (!(future.isDone || future.isCancelled) || imageTransferBuffer.isNotEmpty()) {
/** [List] of grabbed [ShortBuffer]s. */
val audioBuffer: List<Pair<ShortBuffer,Long>> = LinkedList()

val next = imageTransferBuffer.poll(1, TimeUnit.SECONDS) ?: continue
localImageBuffer.add(next)

if (localImageBuffer.last().second >= windowEnd) {
emit(localImageBuffer, windowEnd, sourceRetrievable, this@channelFlow)
windowEnd += TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs)
}
/**
* Returns true if both the image and audio buffer are empty.
*/
fun isEmpty(): Boolean = this.imageBuffer.isEmpty() && this.audioBuffer.isEmpty()

}
/**
* Initializes this [InFlowFrameConsumer].
*
* @param streams List of [Stream]s to initialize the [InFlowFrameConsumer] with.
*/
override fun consumeStreams(streams: MutableList<Stream>) {
this.videoStream = streams.firstOrNull { it.type == Stream.Type.VIDEO }
this.audioStream = streams.firstOrNull { it.type == Stream.Type.AUDIO }
}

while (localImageBuffer.isNotEmpty()) {
emit(localImageBuffer, windowEnd, sourceRetrievable, this@channelFlow)
windowEnd += TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs)
/**
* Consumes a single [Frame].
*
* @param frame [Frame] to consume.
*/
override fun consume(frame: Frame) = runBlocking {
val stream = if (frame.streamId == this@InFlowFrameConsumer.audioStream?.id) {
this@InFlowFrameConsumer.audioStream!!
} else if (frame.streamId == this@InFlowFrameConsumer.videoStream?.id) {
this@InFlowFrameConsumer.videoStream!!
} else {
return@runBlocking
}
val timestamp = ((1000000 * frame.pts) / stream.timebase)
when (stream.type) {
Stream.Type.VIDEO -> {
(this@InFlowFrameConsumer.imageBuffer as LinkedList).add(frame.image!! to timestamp)
if (timestamp >= this@InFlowFrameConsumer.windowEnd) {
this@InFlowFrameConsumer.videoReady = true
}
}
Stream.Type.AUDIO -> {
val samples = ShortBuffer.wrap(frame.samples.map { it.toShort() }.toShortArray())
(this@InFlowFrameConsumer.audioBuffer as LinkedList).add(samples to timestamp)
if (timestamp >= this@InFlowFrameConsumer.windowEnd) {
this@InFlowFrameConsumer.audioReady = true
}
}
else -> {}
}

send(sourceRetrievable)

}
}.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND)
/* If enough frames have been collected, emit them. */
if (this@InFlowFrameConsumer.videoReady && this@InFlowFrameConsumer.audioReady) {
emit()

/* Reset counters and flags. */
this@InFlowFrameConsumer.videoReady = !(this@InFlowFrameConsumer.videoStream != null && this@Instance.video)
this@InFlowFrameConsumer.audioReady = !(this@InFlowFrameConsumer.audioStream != null && this@Instance.audio)

/**
* Emits a single [Retrievable] to the downstream [channel].
*
* @param imageBuffer A [LinkedList] containing [BufferedImage] elements to emit (frames).
* @param grabber The [FrameGrabber] instance.
* @param timestampEnd The end timestamp.
* @param source The source [Retrievable] the emitted [Retrievable] is part of.
*/
private suspend fun emit(
imageBuffer: MutableList<Pair<BufferedImage, Long>>,
timestampEnd: Long,
source: Retrievable,
channel: ProducerScope<Retrievable>
) {

val emitImage = mutableListOf<BufferedImage>()

/* Drain buffer. */
imageBuffer.removeIf {
if (it.second <= timestampEnd) {
emitImage.add(it.first)
true
} else {
false
/* Update window end. */
this@InFlowFrameConsumer.windowEnd += TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs)
}
}

/**
* Emits a single [Retrievable] to the downstream [channel].
*/
suspend fun emit() {
/* Audio samples. */
var audioSize = 0
val emitImage = mutableListOf<BufferedImage>()
val emitAudio = mutableListOf<ShortBuffer>()

/* Drain buffers. */
(this.imageBuffer as LinkedList).removeIf {
if (it.second <= this.windowEnd) {
emitImage.add(it.first)
true
} else {
false
}
}
(this.audioBuffer as LinkedList).removeIf {
if (it.second <= this.windowEnd) {
audioSize += it.first.limit()
emitAudio.add(it.first)
true
} else {
false
}
}

/* Prepare ingested with relationship to source. */
val ingested = Ingested(UUID.randomUUID(), "SEGMENT", false)
source.filteredAttribute(SourceAttribute::class.java)?.let { ingested.addAttribute(it) }
ingested.addRelationship(Relationship.ByRef(ingested, "partOf", source, false))
ingested.addAttribute(
TimeRangeAttribute(
timestampEnd - TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs),
timestampEnd,
TimeUnit.MICROSECONDS
/* Prepare ingested with relationship to source. */
val ingested = Ingested(UUID.randomUUID(), "SEGMENT", false)
this.source.filteredAttribute(SourceAttribute::class.java)?.let { ingested.addAttribute(it) }
ingested.addRelationship(Relationship.ByRef(ingested, "partOf", source, false))
ingested.addAttribute(
TimeRangeAttribute(
this.windowEnd - TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs),
this.windowEnd,
TimeUnit.MICROSECONDS
)
)
)

/* Prepare and append image content element. */
for (image in emitImage) {
val imageContent = this.context.contentFactory.newImageContent(image)
ingested.addContent(imageContent)
ingested.addAttribute(ContentAuthorAttribute(imageContent.id, name))
}
/* Prepare and append audio content element. */
if (emitAudio.size > 0) {
val samples = ShortBuffer.allocate(audioSize)
for (frame in emitAudio) {
frame.clear()
samples.put(frame)
}
samples.clear()
val audio = this@Instance.context.contentFactory.newAudioContent(
this.audioStream!!.channels.toShort(),
this.audioStream!!.sampleRate.toInt(),
samples
)
ingested.addContent(audio)
ingested.addAttribute(ContentAuthorAttribute(audio.id, name))
}

logger.debug { "Emitting ingested ${ingested.id} with ${emitImage.size} images: ${ingested.id}" }
/* Prepare and append image content element. */
for (image in emitImage) {
val imageContent = this@Instance.context.contentFactory.newImageContent(image)
ingested.addContent(imageContent)
ingested.addAttribute(ContentAuthorAttribute(imageContent.id, name))
}

/* Emit ingested. */
channel.send(ingested)
}
logger.debug { "Emitting ingested ${ingested.id} with ${emitImage.size} images and ${emitAudio.size} audio samples: ${ingested.id}" }

/* Emit ingested. */
this.channel.send(ingested)
}
}
}
}

0 comments on commit e94e7f4

Please sign in to comment.