diff --git a/tailer/fileTailer.go b/tailer/fileTailer.go index 2e5f8db1..7004c987 100644 --- a/tailer/fileTailer.go +++ b/tailer/fileTailer.go @@ -107,20 +107,15 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim if eventLoop != nil { defer eventLoop.Close() } - reader := NewBufferedLineReader() + reader := NewBufferedLineReader(lines, done) if file != nil { // process all pre-existing lines - freshLines, err := reader.ReadAvailableLines(file) + finished, err := reader.ReadAvailableLines(file) if err != nil { writeError(errors, done, err, "failed to initialize file system watcher for %v", path) return - } - for _, line := range freshLines { - select { - case <-done: - return - case lines <- line: - } + } else if finished { + return } } @@ -153,19 +148,11 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim } return } - var freshLines []string - file, freshLines, err = evnts.Process(file, reader, abspath, logger) + file, err = evnts.Process(file, reader, abspath, logger) if err != nil { writeError(errors, done, err, "failed to watch %v", abspath) return } - for _, line := range freshLines { - select { - case <-done: - return - case lines <- line: - } - } } } }() diff --git a/tailer/fileTailer_darwin.go b/tailer/fileTailer_darwin.go index 998ca46f..506cbdee 100644 --- a/tailer/fileTailer_darwin.go +++ b/tailer/fileTailer_darwin.go @@ -139,9 +139,8 @@ func (l *eventLoop) Events() chan Events { return l.events } -func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) { +func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) { file = fileBefore - lines = []string{} var truncated bool logger.Debug("File system watcher received %v event(s):\n", len(events.events)) for i, event := range events.events { @@ -167,12 +166,11 @@ func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, a // Handle write event. for _, event := range events.events { if file != nil && event.Ident == fdToInt(file.Fd()) && event.Fflags&syscall.NOTE_WRITE == syscall.NOTE_WRITE { - var freshLines []string - freshLines, err = reader.ReadAvailableLines(file) - if err != nil { + var finished bool + finished, err = reader.ReadAvailableLines(file) + if finished || err != nil { return } - lines = append(lines, freshLines...) } } @@ -196,12 +194,11 @@ func (events *eventList) Process(fileBefore *File, reader *bufferedLineReader, a return } reader.Clear() - var freshLines []string - freshLines, err = reader.ReadAvailableLines(file) - if err != nil { + var finished bool + finished, err = reader.ReadAvailableLines(file) + if finished || err != nil { return } - lines = append(lines, freshLines...) } else { // If file could not be opened, the CREATE event was for another file, we ignore this. err = nil diff --git a/tailer/fileTailer_linux.go b/tailer/fileTailer_linux.go index 1f192736..eba34609 100644 --- a/tailer/fileTailer_linux.go +++ b/tailer/fileTailer_linux.go @@ -141,9 +141,8 @@ func (l *eventLoop) Events() chan Events { return l.events } -func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) { +func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) { file = fileBefore - lines = []string{} filename := filepath.Base(abspath) var truncated bool for _, event := range events { @@ -163,12 +162,11 @@ func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, ab return } } - var freshLines []string - freshLines, err = reader.ReadAvailableLines(file) - if err != nil { + var finished bool + finished, err = reader.ReadAvailableLines(file) + if finished || err != nil { return } - lines = append(lines, freshLines...) } } @@ -189,12 +187,11 @@ func (events eventList) Process(fileBefore *File, reader *bufferedLineReader, ab return } reader.Clear() - var freshLines []string - freshLines, err = reader.ReadAvailableLines(file) - if err != nil { + var finished bool + finished, err = reader.ReadAvailableLines(file) + if finished || err != nil { return } - lines = append(lines, freshLines...) } } return diff --git a/tailer/fileTailer_windows.go b/tailer/fileTailer_windows.go index 04e3ffea..b402ac22 100644 --- a/tailer/fileTailer_windows.go +++ b/tailer/fileTailer_windows.go @@ -105,9 +105,8 @@ func (l *eventLoop) Events() chan Events { return l.events } -func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) { +func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) { file = fileBefore - lines = []string{} var truncated bool logger.Debug("File system watcher received %v.\n", event.String()) @@ -123,12 +122,11 @@ func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspat return } } - var freshLines []string - freshLines, err = reader.ReadAvailableLines(file) - if err != nil { + var finished bool + finished, err = reader.ReadAvailableLines(file) + if finished || err != nil { return } - lines = append(lines, freshLines...) } // MOVED_FROM or DELETE @@ -144,12 +142,11 @@ func (event *event) Process(fileBefore *File, reader *bufferedLineReader, abspat return } reader.Clear() - var freshLines []string - freshLines, err = reader.ReadAvailableLines(file) - if err != nil { + var finished bool + finished, err = reader.ReadAvailableLines(file) + if finished || err != nil { return } - lines = append(lines, freshLines...) } return } diff --git a/tailer/linereader.go b/tailer/linereader.go index 2789514a..a888df18 100644 --- a/tailer/linereader.go +++ b/tailer/linereader.go @@ -22,60 +22,66 @@ import ( type bufferedLineReader struct { remainingBytesFromLastRead []byte + + // channels are used to stream the results out + lines chan<- string + done <-chan struct{} } -func NewBufferedLineReader() *bufferedLineReader { +func NewBufferedLineReader(lines chan<- string, done <-chan struct{}) *bufferedLineReader { return &bufferedLineReader{ remainingBytesFromLastRead: []byte{}, + lines: lines, + done: done, } } -func (r *bufferedLineReader) ReadAvailableLines(file io.Reader) ([]string, error) { - var lines []string - newBytes, err := read2EOF(file) - if err != nil { - return nil, err - } - lines, r.remainingBytesFromLastRead = splitLines(append(r.remainingBytesFromLastRead, newBytes...)) - return lines, nil -} - -func (r *bufferedLineReader) Clear() { - r.remainingBytesFromLastRead = []byte{} -} +func (r *bufferedLineReader) ReadAvailableLines(file io.Reader) (bool, error) { -func read2EOF(file io.Reader) ([]byte, error) { - result := make([]byte, 0) + // for each buffer, split lines and stream buf := make([]byte, 512) + var done bool + for { n, err := file.Read(buf) if n > 0 { // Callers should always process the n > 0 bytes returned before considering the error err. - result = append(result, buf[0:n]...) + result := append(r.remainingBytesFromLastRead, buf[0:n]...) + done, r.remainingBytesFromLastRead = r.processLines(result) + if done { + return true, nil + } } if err != nil { if err == io.EOF { - return result, nil + return false, nil } else { - return nil, fmt.Errorf("read error: %v", err.Error()) + return false, fmt.Errorf("read error: %v", err.Error()) } } } } -func splitLines(data []byte) (lines []string, remainingBytes []byte) { +func (r *bufferedLineReader) Clear() { + r.remainingBytesFromLastRead = []byte{} +} + +func (r *bufferedLineReader) processLines(data []byte) (finished bool, remainingBytes []byte) { newline := []byte("\n") - lines = make([]string, 0) - remainingBytes = make([]byte, 0) for _, line := range bytes.SplitAfter(data, newline) { if bytes.HasSuffix(line, newline) { line = bytes.TrimSuffix(line, newline) line = bytes.TrimSuffix(line, []byte("\r")) // Needed for CRLF line endings? - lines = append(lines, string(line)) + select { + case r.lines <- string(line): + case <-r.done: + finished = true + return + } } else { // This is the last (incomplete) line returned by SplitAfter(). We will exit the for loop here. remainingBytes = line } } - return lines, remainingBytes + return } diff --git a/tailer/linereader_test.go b/tailer/linereader_test.go index 5f08c58e..7f937091 100644 --- a/tailer/linereader_test.go +++ b/tailer/linereader_test.go @@ -45,26 +45,73 @@ func (f *mockFile) Read(p []byte) (int, error) { } } +func collectLines(linechan chan string) []string { + lines := []string{} + for { + select { + case line := <-linechan: + lines = append(lines, line) + default: + return lines + } + } +} + func TestLineReader(t *testing.T) { - file := NewMockFile("This is l", "ine 1\n", "This is line two\nThis is line three\n", "This ", "is ", "line 4", "\n", "\n") - reader := NewBufferedLineReader() - - lines, err := reader.ReadAvailableLines(file) - expectEmpty(t, lines, err) - lines, err = reader.ReadAvailableLines(file) - expectLines(t, lines, err, "This is line 1") - lines, err = reader.ReadAvailableLines(file) - expectLines(t, lines, err, "This is line two", "This is line three") - lines, err = reader.ReadAvailableLines(file) // This - expectEmpty(t, lines, err) - lines, err = reader.ReadAvailableLines(file) // is - expectEmpty(t, lines, err) - lines, err = reader.ReadAvailableLines(file) // line 4 - expectEmpty(t, lines, err) - lines, err = reader.ReadAvailableLines(file) // \n - expectLines(t, lines, err, "This is line 4") - lines, err = reader.ReadAvailableLines(file) // \n - expectLines(t, lines, err, "") + file := NewMockFile("This is l", "ine 1\n", "This is line two\nThis is line three\n", "This ", "is ", "line 4", "\n", "\n", "\n") + + done := make(chan struct{}) + linechan := make(chan string, 20) + + reader := NewBufferedLineReader(linechan, done) + + finished, err := reader.ReadAvailableLines(file) + expectEmpty(t, collectLines(linechan), err) + expectNotFinished(t, finished) + + finished, err = reader.ReadAvailableLines(file) + expectLines(t, collectLines(linechan), err, "This is line 1") + expectNotFinished(t, finished) + + finished, err = reader.ReadAvailableLines(file) + expectLines(t, collectLines(linechan), err, "This is line two", "This is line three") + expectNotFinished(t, finished) + + finished, err = reader.ReadAvailableLines(file) // This + expectEmpty(t, collectLines(linechan), err) + expectNotFinished(t, finished) + + finished, err = reader.ReadAvailableLines(file) // is + expectEmpty(t, collectLines(linechan), err) + expectNotFinished(t, finished) + + finished, err = reader.ReadAvailableLines(file) // line 4 + expectEmpty(t, collectLines(linechan), err) + expectNotFinished(t, finished) + + finished, err = reader.ReadAvailableLines(file) // \n + expectLines(t, collectLines(linechan), err, "This is line 4") + expectNotFinished(t, finished) + + finished, err = reader.ReadAvailableLines(file) // \n + expectLines(t, collectLines(linechan), err, "") + expectNotFinished(t, finished) + + close(done) + finished, err = reader.ReadAvailableLines(file) // \n + expectFinished(t, finished) +} + +func expectNotFinished(t *testing.T, finished bool) { + if finished { + t.Error("expected to be not finished, but finished") + } +} + +func expectFinished(t *testing.T, finished bool) { + if !finished { + t.Error("expected to be finished, but not finished") + } } func expectEmpty(t *testing.T, lines []string, err error) { diff --git a/tailer/pollingFileTailer.go b/tailer/pollingFileTailer.go index 8d9f4285..9e0cc8ad 100644 --- a/tailer/pollingFileTailer.go +++ b/tailer/pollingFileTailer.go @@ -83,11 +83,10 @@ func (l *pollingEventLoop) Events() chan Events { return l.events } -func (e *pollingEvent) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) { +func (e *pollingEvent) Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) { var ( - truncated, moved bool - freshLines []string - filename string + truncated, moved, finished bool + filename string ) file = fileBefore moved, err = file.CheckMoved() @@ -95,13 +94,15 @@ func (e *pollingEvent) Process(fileBefore *File, reader *bufferedLineReader, abs return } if moved { - freshLines, err = reader.ReadAvailableLines(file) + finished, err = reader.ReadAvailableLines(file) if err != nil { return } - lines = append(lines, freshLines...) filename = file.Name() file.Close() + if finished { + return + } file, err = open(filename) if err != nil { return @@ -117,10 +118,9 @@ func (e *pollingEvent) Process(fileBefore *File, reader *bufferedLineReader, abs return } } - freshLines, err = reader.ReadAvailableLines(file) + finished, err = reader.ReadAvailableLines(file) if err != nil { return } - lines = append(lines, freshLines...) return } diff --git a/tailer/watcher.go b/tailer/watcher.go index b097e4ee..1af56be0 100644 --- a/tailer/watcher.go +++ b/tailer/watcher.go @@ -34,5 +34,5 @@ type EventLoop interface { // File system events. // The operating system may return more than one event for each read, so it's plural. type Events interface { - Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) + Process(fileBefore *File, reader *bufferedLineReader, abspath string, logger simpleLogger) (file *File, err error) }