Skip to content

Commit

Permalink
Add save sorter that drops based on duration (#153)
Browse files Browse the repository at this point in the history
A saver that drops frames based on packets is a problem because the
number of packets is very dependent on the codec and the frame rate (if
the codec is video). By making the discarding of old packets based on
time a discard window is more accurate regardless of the packets.

A feature was added to allow either by time or by total packets as
an optional argument as passed into the constructor.
  • Loading branch information
robin-raymond authored Apr 21, 2021
1 parent 9525b15 commit 846a52f
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 17 deletions.
25 changes: 21 additions & 4 deletions mkvcore/blockwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ func TestBlockWriter(t *testing.T) {
{TrackNumber: 1},
{TrackNumber: 2},
}

blockSorter, err := NewMultiTrackBlockSorter(WithMaxDelayedPackets(10), WithSortRule(BlockSorterDropOutdated))
if err != nil {
t.Fatalf("Failed to create MultiTrackBlockSorter: %v", err)
}

ws, err := NewSimpleBlockWriter(buf, tracks,
WithBlockInterceptor(NewMultiTrackBlockSorter(10, BlockSorterDropOutdated)),
)
WithBlockInterceptor(blockSorter))
if err != nil {
t.Fatalf("Failed to create BlockWriter: '%v'", err)
}
Expand Down Expand Up @@ -524,8 +529,14 @@ func BenchmarkBlockWriter_InitFinalize(b *testing.B) {

for i := 0; i < b.N; i++ {
buf := buffercloser.New()

blockSorter, err := NewMultiTrackBlockSorter(WithMaxDelayedPackets(10), WithSortRule(BlockSorterDropOutdated))
if err != nil {
b.Fatalf("Failed to create MultiTrackBlockSorter: %v", err)
}

ws, err := NewSimpleBlockWriter(buf, tracks,
WithBlockInterceptor(NewMultiTrackBlockSorter(10, BlockSorterDropOutdated)),
WithBlockInterceptor(blockSorter),
)
if err != nil {
b.Fatalf("Failed to create BlockWriter: %v", err)
Expand All @@ -542,8 +553,14 @@ func BenchmarkBlockWriter_SimpleBlock(b *testing.B) {
}

buf := buffercloser.New()

blockSorter, err := NewMultiTrackBlockSorter(WithMaxDelayedPackets(10), WithSortRule(BlockSorterDropOutdated))
if err != nil {
b.Fatalf("Failed to create MultiTrackBlockSorter: %v", err)
}

ws, err := NewSimpleBlockWriter(buf, tracks,
WithBlockInterceptor(NewMultiTrackBlockSorter(10, BlockSorterDropOutdated)),
WithBlockInterceptor(blockSorter),
)
if err != nil {
b.Fatalf("Failed to create BlockWriter: %v", err)
Expand Down
6 changes: 6 additions & 0 deletions mkvcore/framebuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ func (b *frameBuffer) Head() *frame {
}
return b.buf[0]
}
func (b *frameBuffer) Tail() *frame {
if len(b.buf) == 0 {
return nil
}
return b.buf[len(b.buf)-1]
}
func (b *frameBuffer) Pop() *frame {
n := len(b.buf)
if n == 0 {
Expand Down
101 changes: 92 additions & 9 deletions mkvcore/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package mkvcore

import (
"fmt"
"io"
"sync"
)
Expand All @@ -25,6 +26,15 @@ type BlockInterceptor interface {
Intercept(r []BlockReader, w []BlockWriter)
}

// MustBlockInterceptor panics if creation of a BlockInterceptor fails, such as
// when the NewMultiTrackBlockSorter function fails.
func MustBlockInterceptor(interceptor BlockInterceptor, err error) BlockInterceptor {
if err != nil {
panic(err)
}
return interceptor
}

type filterWriter struct {
trackNumber uint64
ch chan *frame
Expand Down Expand Up @@ -65,16 +75,75 @@ const (
BlockSorterWriteOutdated
)

// NewMultiTrackBlockSorter creates BlockInterceptor, which sorts blocks on multiple tracks by timestamp.
// The index of TrackEntry sorts blocks with the same timestamp.
// Place the audio track before the video track to meet WebM Interceptor Guidelines.
func NewMultiTrackBlockSorter(maxDelay int, rule BlockSorterRule) BlockInterceptor {
return &multiTrackBlockSorter{maxDelay: maxDelay, rule: rule}
// MultiTrackBlockSorterOption configures a MultiTrackBlockSorterOptions.
type MultiTrackBlockSorterOption func(*MultiTrackBlockSorterOptions) error

// MultiTrackBlockSorterOptions stores options for BlockWriter.
type MultiTrackBlockSorterOptions struct {
maxDelayedPackets int
rule BlockSorterRule
maxTimescaleDelay int64
}

// WithMaxDelayedPackets set the maximum number of packets that may be delayed
// within each track.
func WithMaxDelayedPackets(maxDelayedPackets int) MultiTrackBlockSorterOption {
return func(o *MultiTrackBlockSorterOptions) error {
o.maxDelayedPackets = maxDelayedPackets
return nil
}
}

// WithSortRule set the sort rule to apply to how packet ordering should be
// treated within the webm container.
func WithSortRule(rule BlockSorterRule) MultiTrackBlockSorterOption {
return func(o *MultiTrackBlockSorterOptions) error {
o.rule = rule
return nil
}
}

// WithMaxTimescaleDelay set the maximum allowed delay between tracks for a
// given timescale.
func WithMaxTimescaleDelay(maxTimescaleDelay int64) MultiTrackBlockSorterOption {
return func(o *MultiTrackBlockSorterOptions) error {
o.maxTimescaleDelay = maxTimescaleDelay
return nil
}
}

// NewMultiTrackBlockSorter creates BlockInterceptor, which sorts blocks on
// multiple tracks by timestamp. Either WithMaxDelayedPackets or
// WithMaxTimescaleDelay must be specified. If both are specified, then the
// first rule that is satisfied causes the packets to get written (thus a
// backlog of a max packets or max time scale will cause any older packets than
// the one satisfying the rule to be discarded). The index of TrackEntry sorts
// blocks with the same timestamp. Place the audio track before the video track
// to meet WebM Interceptor Guidelines.
func NewMultiTrackBlockSorter(opts ...MultiTrackBlockSorterOption) (BlockInterceptor, error) {
applyOptions := []MultiTrackBlockSorterOption{
WithMaxDelayedPackets(0),
WithSortRule(BlockSorterDropOutdated),
WithMaxTimescaleDelay(0),
}
applyOptions = append(applyOptions, opts...)

options := &MultiTrackBlockSorterOptions{}
for _, o := range applyOptions {
if err := o(options); err != nil {
return nil, err
}
}

if options.maxDelayedPackets == 0 && options.maxTimescaleDelay == 0 {
return nil, fmt.Errorf("must specify either WithMaxDelayedPackets(...) or WithMaxTimescaleDelay(...) with a non-0 value")
}

return &multiTrackBlockSorter{options: *options}, nil
}

type multiTrackBlockSorter struct {
maxDelay int
rule BlockSorterRule
options MultiTrackBlockSorterOptions
}

func (s *multiTrackBlockSorter) Intercept(r []BlockReader, w []BlockWriter) {
Expand Down Expand Up @@ -114,22 +183,36 @@ func (s *multiTrackBlockSorter) Intercept(r []BlockReader, w []BlockWriter) {
nChReq = len(r)
}
for {
var largestTimestampDelta int64
var tOldest int64
var tNewest int64
var nCh, nMax int
var bOldest *frameBuffer
var bNewest *frameBuffer
for _, b := range buf {
if n := b.Size(); n > 0 {
nCh++
if f := b.Head(); f.timestamp < tOldest || bOldest == nil {
tOldest = f.timestamp
bOldest = b
}
if f := b.Tail(); f.timestamp > tNewest || bNewest == nil {
tNewest = f.timestamp
bNewest = b

tDiff := tNewest - tOldest
if tDiff > largestTimestampDelta {
largestTimestampDelta = tDiff
}
}
if n > nMax {
nMax = n
}
}
}
if nCh >= nChReq || nMax > s.maxDelay {
if nCh >= nChReq ||
(nMax > s.options.maxDelayedPackets && s.options.maxDelayedPackets != 0) ||
(largestTimestampDelta > s.options.maxTimescaleDelay && s.options.maxTimescaleDelay != 0) {
fOldest := bOldest.Pop()
_, _ = w[fOldest.trackNumber].Write(fOldest.keyframe, fOldest.timestamp, fOldest.b)
tDone = fOldest.timestamp
Expand All @@ -142,7 +225,7 @@ func (s *multiTrackBlockSorter) Intercept(r []BlockReader, w []BlockWriter) {
for {
select {
case d := <-ch:
if d.timestamp >= tDone || s.rule == BlockSorterWriteOutdated {
if d.timestamp >= tDone || s.options.rule == BlockSorterWriteOutdated {
buf[d.trackNumber].Push(d)
flush(false)
}
Expand Down
Loading

0 comments on commit 846a52f

Please sign in to comment.