Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tailer: don't load entire file into memory, stream it out bits at a time #48

Merged
merged 1 commit into from
Oct 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions tailer/fileTailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,15 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
if eventLoop != nil {
defer eventLoop.Close()
}
reader := NewBufferedLineReader()
reader := NewBufferedLineReader(lines, done)
if file != nil {
// process all pre-existing lines
freshLines, err := reader.ReadAvailableLines(file)
finished, err := reader.ReadAvailableLines(file)
if err != nil {
writeError(errors, done, err, "failed to initialize file system watcher for %v", path)
return
}
for _, line := range freshLines {
select {
case <-done:
return
case lines <- line:
}
} else if finished {
return
}
}

Expand Down Expand Up @@ -153,19 +148,11 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
}
return
}
var freshLines []string
file, freshLines, err = evnts.Process(file, reader, abspath, logger)
file, err = evnts.Process(file, reader, abspath, logger)
if err != nil {
writeError(errors, done, err, "failed to watch %v", abspath)
return
}
for _, line := range freshLines {
select {
case <-done:
return
case lines <- line:
}
}
}
}
}()
Expand Down
17 changes: 7 additions & 10 deletions tailer/fileTailer_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,8 @@ func (l *eventLoop) Events() chan Events {
return l.events
}

func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) {
func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) {
file = fileBefore
lines = []string{}
var truncated bool
logger.Debug("File system watcher received %v event(s):\n", len(events.events))
for i, event := range events.events {
Expand All @@ -167,12 +166,11 @@ func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, a
// Handle write event.
for _, event := range events.events {
if file != nil && event.Ident == fdToInt(file.Fd()) && event.Fflags&syscall.NOTE_WRITE == syscall.NOTE_WRITE {
var freshLines []string
freshLines, err = reader.ReadAvailableLines(file)
if err != nil {
var finished bool
finished, err = reader.ReadAvailableLines(file)
if finished || err != nil {
return
}
lines = append(lines, freshLines...)
}
}

Expand All @@ -196,12 +194,11 @@ func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, a
return
}
reader.Clear()
var freshLines []string
freshLines, err = reader.ReadAvailableLines(file)
if err != nil {
var finished bool
finished, err = reader.ReadAvailableLines(file)
if finished || err != nil {
return
}
lines = append(lines, freshLines...)
} else {
// If file could not be opened, the CREATE event was for another file, we ignore this.
err = nil
Expand Down
17 changes: 7 additions & 10 deletions tailer/fileTailer_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ func (l *eventLoop) Events() chan Events {
return l.events
}

func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) {
func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) {
file = fileBefore
lines = []string{}
filename := filepath.Base(abspath)
var truncated bool
for _, event := range events {
Expand All @@ -163,12 +162,11 @@ func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, ab
return
}
}
var freshLines []string
freshLines, err = reader.ReadAvailableLines(file)
if err != nil {
var finished bool
finished, err = reader.ReadAvailableLines(file)
if finished || err != nil {
return
}
lines = append(lines, freshLines...)
}
}

Expand All @@ -189,12 +187,11 @@ func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, ab
return
}
reader.Clear()
var freshLines []string
freshLines, err = reader.ReadAvailableLines(file)
if err != nil {
var finished bool
finished, err = reader.ReadAvailableLines(file)
if finished || err != nil {
return
}
lines = append(lines, freshLines...)
}
}
return
Expand Down
17 changes: 7 additions & 10 deletions tailer/fileTailer_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ func (l *eventLoop) Events() chan Events {
return l.events
}

func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) {
func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) {
file = fileBefore
lines = []string{}
var truncated bool
logger.Debug("File system watcher received %v.\n", event.String())

Expand All @@ -123,12 +122,11 @@ func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspat
return
}
}
var freshLines []string
freshLines, err = reader.ReadAvailableLines(file)
if err != nil {
var finished bool
finished, err = reader.ReadAvailableLines(file)
if finished || err != nil {
return
}
lines = append(lines, freshLines...)
}

// MOVED_FROM or DELETE
Expand All @@ -144,12 +142,11 @@ func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspat
return
}
reader.Clear()
var freshLines []string
freshLines, err = reader.ReadAvailableLines(file)
if err != nil {
var finished bool
finished, err = reader.ReadAvailableLines(file)
if finished || err != nil {
return
}
lines = append(lines, freshLines...)
}
return
}
Expand Down
54 changes: 30 additions & 24 deletions tailer/linereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,60 +22,66 @@ import (

type bufferedLineReader struct {
remainingBytesFromLastRead []byte

// channels are used to stream the results out
lines chan<- string
done <-chan struct{}
}

func NewBufferedLineReader() *bufferedLineReader {
func NewBufferedLineReader(lines chan<- string, done <-chan struct{}) *bufferedLineReader {
return &bufferedLineReader{
remainingBytesFromLastRead: []byte{},
lines: lines,
done: done,
}
}

func (r *bufferedLineReader) ReadAvailableLines(file io.Reader) ([]string, error) {
var lines []string
newBytes, err := read2EOF(file)
if err != nil {
return nil, err
}
lines, r.remainingBytesFromLastRead = splitLines(append(r.remainingBytesFromLastRead, newBytes...))
return lines, nil
}

func (r *bufferedLineReader) Clear() {
r.remainingBytesFromLastRead = []byte{}
}
func (r *bufferedLineReader) ReadAvailableLines(file io.Reader) (bool, error) {

func read2EOF(file io.Reader) ([]byte, error) {
result := make([]byte, 0)
// for each buffer, split lines and stream
buf := make([]byte, 512)
var done bool

for {
n, err := file.Read(buf)
if n > 0 {
// Callers should always process the n > 0 bytes returned before considering the error err.
result = append(result, buf[0:n]...)
result := append(r.remainingBytesFromLastRead, buf[0:n]...)
done, r.remainingBytesFromLastRead = r.processLines(result)
if done {
return true, nil
}
}
if err != nil {
if err == io.EOF {
return result, nil
return false, nil
} else {
return nil, fmt.Errorf("read error: %v", err.Error())
return false, fmt.Errorf("read error: %v", err.Error())
}
}
}
}

func splitLines(data []byte) (lines []string, remainingBytes []byte) {
func (r *bufferedLineReader) Clear() {
r.remainingBytesFromLastRead = []byte{}
}

func (r *bufferedLineReader) processLines(data []byte) (finished bool, remainingBytes []byte) {
newline := []byte("\n")
lines = make([]string, 0)
remainingBytes = make([]byte, 0)
for _, line := range bytes.SplitAfter(data, newline) {
if bytes.HasSuffix(line, newline) {
line = bytes.TrimSuffix(line, newline)
line = bytes.TrimSuffix(line, []byte("\r")) // Needed for CRLF line endings?
lines = append(lines, string(line))
select {
case r.lines <- string(line):
case <-r.done:
finished = true
return
}
} else {
// This is the last (incomplete) line returned by SplitAfter(). We will exit the for loop here.
remainingBytes = line
}
}
return lines, remainingBytes
return
}
85 changes: 66 additions & 19 deletions tailer/linereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,73 @@ func (f *mockFile) Read(p []byte) (int, error) {
}
}

func collectLines(linechan chan string) []string {
lines := []string{}
for {
select {
case line := <-linechan:
lines = append(lines, line)
default:
return lines
}
}
}

func TestLineReader(t *testing.T) {
file := NewMockFile("This is l", "ine 1\n", "This is line two\nThis is line three\n", "This ", "is ", "line 4", "\n", "\n")
reader := NewBufferedLineReader()

lines, err := reader.ReadAvailableLines(file)
expectEmpty(t, lines, err)
lines, err = reader.ReadAvailableLines(file)
expectLines(t, lines, err, "This is line 1")
lines, err = reader.ReadAvailableLines(file)
expectLines(t, lines, err, "This is line two", "This is line three")
lines, err = reader.ReadAvailableLines(file) // This
expectEmpty(t, lines, err)
lines, err = reader.ReadAvailableLines(file) // is
expectEmpty(t, lines, err)
lines, err = reader.ReadAvailableLines(file) // line 4
expectEmpty(t, lines, err)
lines, err = reader.ReadAvailableLines(file) // \n
expectLines(t, lines, err, "This is line 4")
lines, err = reader.ReadAvailableLines(file) // \n
expectLines(t, lines, err, "")
file := NewMockFile("This is l", "ine 1\n", "This is line two\nThis is line three\n", "This ", "is ", "line 4", "\n", "\n", "\n")

done := make(chan struct{})
linechan := make(chan string, 20)

reader := NewBufferedLineReader(linechan, done)

finished, err := reader.ReadAvailableLines(file)
expectEmpty(t, collectLines(linechan), err)
expectNotFinished(t, finished)

finished, err = reader.ReadAvailableLines(file)
expectLines(t, collectLines(linechan), err, "This is line 1")
expectNotFinished(t, finished)

finished, err = reader.ReadAvailableLines(file)
expectLines(t, collectLines(linechan), err, "This is line two", "This is line three")
expectNotFinished(t, finished)

finished, err = reader.ReadAvailableLines(file) // This
expectEmpty(t, collectLines(linechan), err)
expectNotFinished(t, finished)

finished, err = reader.ReadAvailableLines(file) // is
expectEmpty(t, collectLines(linechan), err)
expectNotFinished(t, finished)

finished, err = reader.ReadAvailableLines(file) // line 4
expectEmpty(t, collectLines(linechan), err)
expectNotFinished(t, finished)

finished, err = reader.ReadAvailableLines(file) // \n
expectLines(t, collectLines(linechan), err, "This is line 4")
expectNotFinished(t, finished)

finished, err = reader.ReadAvailableLines(file) // \n
expectLines(t, collectLines(linechan), err, "")
expectNotFinished(t, finished)

close(done)
finished, err = reader.ReadAvailableLines(file) // \n
expectFinished(t, finished)
}

func expectNotFinished(t *testing.T, finished bool) {
if finished {
t.Error("expected to be not finished, but finished")
}
}

func expectFinished(t *testing.T, finished bool) {
if !finished {
t.Error("expected to be finished, but not finished")
}
}

func expectEmpty(t *testing.T, lines []string, err error) {
Expand Down
Loading