Skip to content

Commit

Permalink
io.Seeker interface implementation (#10)
Browse files Browse the repository at this point in the history
Support io.Seeker
  • Loading branch information
Xmister authored and klauspost committed Feb 17, 2019
1 parent 7f90b27 commit 4ca2510
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 9 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ os:
- osx

go:
- 1.6.x
- 1.7.x
- 1.8.x
- 1.9.x
- 1.10.x
- 1.11.x
- master

script:
Expand Down
119 changes: 114 additions & 5 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,23 @@ import (
"io"
)

type seekable struct {
*reader
}

type ReadSeekCloser interface {
io.ReadCloser
io.Seeker
}

type reader struct {
in io.Reader // Input reader
closer io.Closer // Optional closer
ready chan *buffer // Buffers ready to be handed to the reader
reuse chan *buffer // Buffers to reuse for input reading
exit chan struct{} // Closes when finished
buffers int // Number of buffers
size int // Size of each buffer
err error // If an error has occurred it is here
cur *buffer // Current buffer being served
exited chan struct{} // Channel is closed been the async reader shuts down
Expand All @@ -38,6 +48,8 @@ type reader struct {
//
// The input can be read from the returned reader.
// When done use Close() to release the buffers.
// If a reader supporting the io.Seeker is given,
// the returned reader will also support it.
func NewReader(rd io.Reader) io.ReadCloser {
if rd == nil {
return nil
Expand All @@ -61,6 +73,8 @@ func NewReader(rd io.Reader) io.ReadCloser {
// The input can be read from the returned reader.
// When done use Close() to release the buffers,
// which will also close the supplied closer.
// If a reader supporting the io.Seeker is given,
// the returned reader will also support it.
func NewReadCloser(rd io.ReadCloser) io.ReadCloser {
if rd == nil {
return nil
Expand All @@ -75,10 +89,39 @@ func NewReadCloser(rd io.ReadCloser) io.ReadCloser {
return ret
}

// New returns a reader that will asynchronously read from
// the supplied reader into 4 buffers of 1MB each.
//
// It will start reading from the input at once, maybe even before this
// function has returned.
//
// The input can be read and seeked from the returned reader.
// When done use Close() to release the buffers.
func NewReadSeeker(rd io.ReadSeeker) ReadSeekCloser {
//Not checking for result as the input interface guarantees it's seekable
res, _ := NewReader(rd).(ReadSeekCloser)
return res
}

// New returns a reader that will asynchronously read from
// the supplied reader into 4 buffers of 1MB each.
//
// It will start reading from the input at once, maybe even before this
// function has returned.
//
// The input can be read and seeked from the returned reader.
// When done use Close() to release the buffers,
// which will also close the supplied closer.
func NewReadSeekCloser(rd ReadSeekCloser) ReadSeekCloser {
//Not checking for result as the input interface guarantees it's seekable
res, _ := NewReadCloser(rd).(ReadSeekCloser)
return res
}

// NewReaderSize returns a reader with a custom number of buffers and size.
// buffers is the number of queued buffers and size is the size of each
// buffer in bytes.
func NewReaderSize(rd io.Reader, buffers, size int) (io.ReadCloser, error) {
func NewReaderSize(rd io.Reader, buffers, size int) (res io.ReadCloser, err error) {
if size <= 0 {
return nil, fmt.Errorf("buffer size too small")
}
Expand All @@ -89,14 +132,19 @@ func NewReaderSize(rd io.Reader, buffers, size int) (io.ReadCloser, error) {
return nil, fmt.Errorf("nil input reader supplied")
}
a := &reader{}
if _, ok := rd.(io.Seeker); ok {
res = &seekable{a}
} else {
res = a
}
a.init(rd, buffers, size)
return a, nil
return
}

// NewReadCloserSize returns a reader with a custom number of buffers and size.
// buffers is the number of queued buffers and size is the size of each
// buffer in bytes.
func NewReadCloserSize(rc io.ReadCloser, buffers, size int) (io.ReadCloser, error) {
func NewReadCloserSize(rc io.ReadCloser, buffers, size int) (res io.ReadCloser, err error) {
if size <= 0 {
return nil, fmt.Errorf("buffer size too small")
}
Expand All @@ -107,8 +155,39 @@ func NewReadCloserSize(rc io.ReadCloser, buffers, size int) (io.ReadCloser, erro
return nil, fmt.Errorf("nil input reader supplied")
}
a := &reader{closer: rc}
if _, ok := rc.(io.Seeker); ok {
res = &seekable{a}
} else {
res = a
}
a.init(rc, buffers, size)
return a, nil
return
}

// NewReadSeekerSize returns a reader with a custom number of buffers and size.
// buffers is the number of queued buffers and size is the size of each
// buffer in bytes.
func NewReadSeekerSize(rd io.ReadSeeker, buffers, size int) (res ReadSeekCloser, err error) {
reader, err := NewReaderSize(rd, buffers, size)
if err != nil {
return nil, err
}
//Not checking for result as the input interface guarantees it's seekable
res, _ = reader.(ReadSeekCloser)
return
}

// NewReadSeekCloserSize returns a reader with a custom number of buffers and size.
// buffers is the number of queued buffers and size is the size of each
// buffer in bytes.
func NewReadSeekCloserSize(rd ReadSeekCloser, buffers, size int) (res ReadSeekCloser, err error) {
reader, err := NewReadCloserSize(rd, buffers, size)
if err != nil {
return nil, err
}
//Not checking for result as the input interface guarantees it's seekable
res, _ = reader.(ReadSeekCloser)
return
}

// initialize the reader
Expand All @@ -119,7 +198,9 @@ func (a *reader) init(rd io.Reader, buffers, size int) {
a.exit = make(chan struct{}, 0)
a.exited = make(chan struct{}, 0)
a.buffers = buffers
a.size = size
a.cur = nil
a.err = nil

// Create buffers
for i := 0; i < buffers; i++ {
Expand All @@ -130,13 +211,13 @@ func (a *reader) init(rd io.Reader, buffers, size int) {
go func() {
// Ensure that when we exit this is signalled.
defer close(a.exited)
defer close(a.ready)
for {
select {
case b := <-a.reuse:
err := b.read(a.in)
a.ready <- b
if err != nil {
close(a.ready)
return
}
case <-a.exit:
Expand Down Expand Up @@ -183,6 +264,34 @@ func (a *reader) Read(p []byte) (n int, err error) {
return n, nil
}

func (a *seekable) Seek(offset int64, whence int) (res int64, err error) {
//Not checking the result as seekable receiver guarantees it to be assertable
seeker, _ := a.in.(io.Seeker)
//Make sure the async routine is closed
select {
case <-a.exited:
case a.exit <- struct{}{}:
<-a.exited
}
if whence == io.SeekCurrent {
//If need to seek based on current position, take into consideration the bytes we read but the consumer
//doesn't know about
err = nil
for a.cur != nil {
if err = a.fill(); err == nil && a.cur != nil {
offset -= int64(len(a.cur.buffer()))
a.cur.offset = len(a.cur.buf)
}
}
}
//Seek the actual Seeker
if res, err = seeker.Seek(offset, whence); err == nil {
//If the seek was successful, reinitalize ourselves (with the new position).
a.init(a.in, a.buffers, a.size)
}
return
}

// WriteTo writes data to w until there's no more data to write or when an error occurs.
// The return value n is the number of bytes written.
// Any error encountered during the write is also returned.
Expand Down
145 changes: 143 additions & 2 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"bytes"
"errors"
"fmt"
"github.com/klauspost/readahead"
"io"
"io/ioutil"
"strings"
"sync"
"testing"
"testing/iotest"

"github.com/klauspost/readahead"
)

func TestReader(t *testing.T) {
Expand Down Expand Up @@ -65,6 +64,148 @@ func TestReader(t *testing.T) {
}
}

type SeekerBuffer struct {
*bytes.Buffer
pos int64
}

func (s *SeekerBuffer) Read(p []byte) (n int, err error) {
n, err = bytes.NewReader(s.Bytes()[s.pos:]).Read(p)
if n > 0 {
s.pos += int64(n)
}
return
}

func (s *SeekerBuffer) Seek(offset int64, whence int) (res int64, err error) {
if offset > int64(s.Len()) {
err = fmt.Errorf("wrong offset")
return
}
switch whence {
case io.SeekStart:
res = offset
case io.SeekCurrent:
res = s.pos + offset
case io.SeekEnd:
res = int64(s.Len()) + offset
}
s.pos = res
return
}

func TestSeeker(t *testing.T) {
testBytes := []byte("Testbuffer")
newControl := func(i int) io.Reader {
buf := bytes.NewBuffer(testBytes)
for j := 0; j < i*100-1; j++ {
buf.Write(testBytes)
}
return buf
}
for i := 1; i <= 100; i++ {
length := len(testBytes) * i * 100
buf := &SeekerBuffer{
Buffer: bytes.NewBuffer(testBytes),
}
for j := 0; j < i*100-1; j++ {
buf.Write(testBytes)
}
control := newControl(i)
ar, err := readahead.NewReadSeekerSize(buf, i, 11*i)
if _, ok := control.(io.Seeker); ok {
t.Fatal("created reader implements seeking without underlying reader support")
}
if err != nil {
t.Fatal("error when creating:", err)
}
dstSize := 3 * i
dst := make([]byte, dstSize)
controlDst := make([]byte, dstSize)
control.Read(controlDst)
n, err := ar.Read(dst)
if err != nil {
t.Fatal("error when reading:", err)
}
if n != dstSize {
t.Fatal("unexpected length, expected ", dstSize, ", got ", n)
}
if string(dst) != string(controlDst) {
t.Fatal("seeker and control reader mismatch")
}

pos, err := ar.Seek(1, io.SeekStart)
if err != nil {
t.Fatal("error when seeking:", err)
}
if pos != 1 {
t.Fatal("unexpected position, expected 1, got ", pos)
}
control = newControl(i)
control.Read(make([]byte, 1)) //Emulate seeking to offset 1 from beginning
control.Read(controlDst)
n, err = ar.Read(dst)
if err != nil {
t.Fatal("error when reading:", err)
}
if n != dstSize {
t.Fatal("unexpected length, expected ", dstSize, ", got ", n)
}
if string(dst) != string(controlDst) {
t.Fatal("seeker and control reader mismatch")
}

pos, err = ar.Seek(int64(i), io.SeekCurrent)
if err != nil {
t.Fatal("error when seeking:", err)
}
if pos != int64(dstSize+i+1) {
t.Fatal("unexpected position, expected ", dstSize+i, ", got ", pos)
}
control.Read(make([]byte, int64(i))) //Emulate seeking to offset 1 from current pos
control.Read(controlDst)
n, err = ar.Read(dst)
if err != nil {
t.Fatal("error when reading:", err)
}
if n != dstSize {
t.Fatal("unexpected length, expected ", dstSize, ", got ", n)
}
if string(dst) != string(controlDst) {
t.Fatal("seeker and control reader mismatch")
}

control = newControl(i)
pos, err = ar.Seek(-1, io.SeekEnd)
if err != nil {
t.Fatal("error when seeking:", err)
}
if pos != int64(length-1) {
t.Fatal("unexpected position, expected ", length-1, ", got ", pos)
}
control.Read(make([]byte, length-1)) //Emulate seeking to offset -1 from the end
control.Read(controlDst)
n, err = ar.Read(dst)
if err != nil {
t.Fatal("error when reading:", err)
}
if n != 1 {
t.Fatal("unexpected length, expected 1, got ", n)
}
if string(dst[:n]) != string(controlDst[:n]) {
t.Fatal("seeker and control reader mismatch")
}

n, err = ar.Read(dst)
if err != io.EOF {
t.Fatal("expected io.EOF, got", err)
}
if n != 0 {
t.Fatal("unexpected length, expected 0, got ", n)
}
}
}

type testCloser struct {
io.Reader
closed int
Expand Down

0 comments on commit 4ca2510

Please sign in to comment.