Skip to content

Commit

Permalink
#5 bugfixes and new tests for the multiple logfile tailer
Browse files Browse the repository at this point in the history
  • Loading branch information
fstab committed May 25, 2019
1 parent f0aa540 commit b6b5e1a
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 16 deletions.
9 changes: 8 additions & 1 deletion tailer/bufferedTailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type bufferedTailer struct {
out chan *fswatcher.Line
orig fswatcher.FileTailer
done chan struct{}
}

func (b *bufferedTailer) Lines() chan *fswatcher.Line {
Expand All @@ -35,6 +36,7 @@ func (b *bufferedTailer) Errors() chan fswatcher.Error {

func (b *bufferedTailer) Close() {
b.orig.Close()
close(b.done)
}

func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
Expand Down Expand Up @@ -103,6 +105,7 @@ func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric, log logrus.FieldLogger, maxLinesInBuffer int) fswatcher.FileTailer {
buffer := NewLineBuffer()
out := make(chan *fswatcher.Line)
done := make(chan struct{})

// producer
go func() {
Expand Down Expand Up @@ -135,12 +138,16 @@ func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric Buffe
return
}
bufferLoadMetric.Dec()
out <- line
select {
case out <- line:
case <-done:
}
}
}()
return &bufferedTailer{
out: out,
orig: orig,
done: done,
}
}

Expand Down
9 changes: 7 additions & 2 deletions tailer/fswatcher/file_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package fswatcher
import (
"io"
"os"
"syscall"
)

// On macOS, we keep dirs open, so we use *os.File.
Expand Down Expand Up @@ -44,8 +45,12 @@ func (d *Dir) ls() ([]os.FileInfo, Error) {
return fileInfos, nil
}

func NewFile(orig *os.File, newPath string) *os.File {
return os.NewFile(orig.Fd(), newPath)
func NewFile(orig *os.File, newPath string) (*os.File, error) {
fd, err := syscall.Dup(int(orig.Fd()))
if err != nil {
return nil, err
}
return os.NewFile(uintptr(fd), newPath), nil
}

func open(path string) (*os.File, error) {
Expand Down
14 changes: 11 additions & 3 deletions tailer/fswatcher/file_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package fswatcher

import "os"
import (
"os"
"syscall"
)

// On Linux, we don't need to keep the directory open, but we need to keep an open watch descriptor handle.
type Dir struct {
Expand Down Expand Up @@ -45,8 +48,13 @@ func (d *Dir) ls() ([]os.FileInfo, Error) {
return fileInfos, nil
}

func NewFile(orig *os.File, newPath string) *os.File {
return os.NewFile(orig.Fd(), newPath)
func NewFile(orig *os.File, newPath string) (*os.File, error) {
// The finalizer will close orig.Fd() even if we don't close it explicitly. Therefore we must Dup().
fd, err := syscall.Dup(int(orig.Fd()))
if err != nil {
return nil, err
}
return os.NewFile(uintptr(fd), newPath), nil
}

func open(path string) (*os.File, error) {
Expand Down
4 changes: 2 additions & 2 deletions tailer/fswatcher/file_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func (d *Dir) ls() ([]*fileInfo, Error) {
}

// like os.NewFile()
func NewFile(orig *File, newPath string) *File {
func NewFile(orig *File, newPath string) (*File, error) {
return &File{
path: newPath,
currentPos: orig.currentPos,
fileIndexLow: orig.fileIndexLow,
fileIndexHigh: orig.fileIndexHigh,
}
}, nil
}

func open(path string) (*File, Error) {
Expand Down
9 changes: 8 additions & 1 deletion tailer/fswatcher/fseventProducerLoop_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"strings"
"syscall"
"time"
"unsafe"
)

Expand Down Expand Up @@ -71,9 +72,15 @@ func runInotifyLoop(fd int) *inotifyloop {
for {
n, err = syscall.Read(l.fd, buf)
if err != nil {
// Getting an err might be part of the shutdown, when l.fd is closed.
// We decide whether it is an actual error or not by checking if l.done is closed.
select {
case l.errors <- NewError(NotSpecified, err, "failed to read inotify events"):
case <-l.done:
case <-time.After(2 * time.Second):
select {
case l.errors <- NewError(NotSpecified, err, "failed to read inotify events"):
case <-l.done:
}
}
return
}
Expand Down
17 changes: 13 additions & 4 deletions tailer/fswatcher/fswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ func runFileTailer(initFunc func() (fswatcher, Error), globs []glob.Glob, readal
select {
case <-t.done:
case t.errors <- Err:
return
}
return
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ func (t *fileTailer) shutdown() {
close(t.errors)

warnf := func(format string, args ...interface{}) {
log.Warnf("error while shutting down the file system watcher: %v", fmt.Sprint(format, args))
log.Warnf("error while shutting down the file system watcher: %v", fmt.Sprintf(format, args))
}

for _, dir := range t.watchedDirs {
Expand Down Expand Up @@ -278,7 +278,11 @@ func (t *fileTailer) syncFilesInDir(dir *Dir, readall bool, log logrus.FieldLogg
if alreadyWatched != nil {
if alreadyWatched.file.Name() != filePath {
fileLogger.WithField("fd", alreadyWatched.file.Fd()).Infof("file was moved from %v", alreadyWatched.file.Name())
alreadyWatched.file = NewFile(alreadyWatched.file, filePath)
oldFileWithNewPath, err := NewFile(alreadyWatched.file, filePath)
if err != nil {
return NewErrorf(NotSpecified, err, "%v: failed to follow moved file", filePath)
}
alreadyWatched.file = oldFileWithNewPath
} else {
fileLogger.Debug("skipping, because file is already watched")
}
Expand All @@ -287,7 +291,12 @@ func (t *fileTailer) syncFilesInDir(dir *Dir, readall bool, log logrus.FieldLogg
}
newFile, err := open(filePath)
if err != nil {
return NewErrorf(NotSpecified, err, "%v: failed to open file", filePath)
if os.IsNotExist(err) {
fileLogger.Debug("skipping, because file does no longer exist")
continue
} else {
return NewErrorf(NotSpecified, err, "%v: failed to open file", filePath)
}
}
if !readall {
_, err = newFile.Seek(0, io.SeekEnd)
Expand Down
43 changes: 40 additions & 3 deletions tailer/fswatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,38 @@ const tests = `
- [expect, test line 3 dir 1 file 2, logdir1/logfile-2.log]
- [expect, test line 3 dir 2 file 1, logdir2/logfile-1.log]
- [expect, test line 3 dir 2 file 2, logdir2/logfile-2.log]
- name: nested directories
commands:
- [mkdir, outer]
- [mkdir, outer/inner]
- [log, outer line 1, outer/logfile.log]
- [log, inner line 1, outer/inner/logfile.log]
- [start file tailer, readall=true, fail_on_missing_logfile=false, outer/*.log, outer/inner/*.log]
- [expect, outer line 1, outer/logfile.log]
- [expect, inner line 1, outer/inner/logfile.log]
- [log, outer line 2, outer/logfile.log]
- [log, inner line 2, outer/inner/logfile.log]
- [expect, outer line 2, outer/logfile.log]
- [expect, inner line 2, outer/inner/logfile.log]
- [logrotate, outer/logfile.log, outer/logfile.log.1]
- [logrotate, outer/inner/logfile.log, outer/inner/logfile.log.1]
- [log, outer line 3, outer/logfile.log]
- [log, inner line 3, outer/inner/logfile.log]
- [expect, outer line 3, outer/logfile.log]
- [expect, inner line 3, outer/inner/logfile.log]
- name: watch after logrotate
commands:
- [mkdir, logdir]
- [log, line 1, logdir/logfile.log]
- [start file tailer, readall=true, fail_on_missing_logfile=false, logdir/*]
- [expect, line 1, logdir/logfile.log]
- [log, line 2, logdir/logfile.log]
- [expect, line 2, logdir/logfile.log]
- [logrotate, logdir/logfile.log, logdir/logfile.log.1]
- [log, line 3, logdir/logfile.log]
- [expect, line 3, logdir/logfile.log]
`

// // The following test fails on Windows in tearDown() when removing logdir.
Expand Down Expand Up @@ -209,14 +241,19 @@ func executeCommands(t *testing.T, ctx *context, cmds [][]string) {
for _, cmd := range cmds {
exec(t, ctx, cmd)
}
closeTailer(t, ctx)
// The "watch after logrotate" test watches logdir/* and rotates logdir/logfile.log
// to logdir/logfile.log.1. As a result, the file is still watched after it is rotated.
// Depending on the logrotate config the lines are read again (cp) or not (mv).
// We ignore unexpected lines for that test.
// TODO: Make ignoreUnexpectedLines an explicit paramter in the test yaml instead of using the test name here.
closeTailer(t, ctx, ctx.testName == "watch after logrotate")
assertGoroutinesTerminated(t, ctx, nGoroutinesBefore)
for _, writer := range ctx.logFileWriters {
writer.close(t, ctx)
}
}

func closeTailer(t *testing.T, ctx *context) {
func closeTailer(t *testing.T, ctx *context, ignoreUnexpectedLines bool) {
// Note: This function checks if the Lines() channel gets closed.
// While it's good to check this, it doesn't guarantee that the tailer is
// fully shut down. There might be an fseventProducerLoop running in the
Expand All @@ -230,7 +267,7 @@ func closeTailer(t *testing.T, ctx *context) {
// check if the lines channel gets closed
select {
case line, open := <-ctx.tailer.Lines():
if open {
if open && !ignoreUnexpectedLines {
fatalf(t, ctx, "read unexpected line line from file %q: %q", line.File, line.Line)
}
case <-time.After(timeout):
Expand Down

0 comments on commit b6b5e1a

Please sign in to comment.