Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tailer: add option to start tailing at specified seek offset #47

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions tailer/fileTailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,39 @@ func (f *fileTailer) Errors() chan Error {
return f.errors
}

func getSeekArgs(readall bool) (bool, int64, int) {
if readall {
return false, 0, 0
} else {
return true, 0, io.SeekEnd
}
}

func RunFseventFileTailer(path string, readall bool, failOnMissingFile bool, logger simpleLogger) Tailer {
return runFileTailer(path, readall, failOnMissingFile, logger, NewFseventWatcher)
seek, offset, whence := getSeekArgs(readall)
return runFileTailer(path, failOnMissingFile, logger, seek, offset, whence, NewFseventWatcher)
}

func RunFseventFileTailerWithSeek(path string, failOnMissingFile bool, logger simpleLogger, offset int64, whence int) Tailer {
return runFileTailer(path, failOnMissingFile, logger, true, offset, whence, NewFseventWatcher)
}

func RunPollingFileTailer(path string, readall bool, failOnMissingFile bool, pollIntervall time.Duration, logger simpleLogger) Tailer {
seek, offset, whence := getSeekArgs(readall)
makeWatcher := func(abspath string, _ *File) (Watcher, error) {
return NewPollingWatcher(abspath, pollIntervall)
}
return runFileTailer(path, failOnMissingFile, logger, seek, offset, whence, makeWatcher)
}

func RunPollingFileTailerWithSeek(path string, failOnMissingFile bool, pollIntervall time.Duration, logger simpleLogger, offset int64, whence int) Tailer {
makeWatcher := func(abspath string, _ *File) (Watcher, error) {
return NewPollingWatcher(abspath, pollIntervall)
}
return runFileTailer(path, readall, failOnMissingFile, logger, makeWatcher)
return runFileTailer(path, failOnMissingFile, logger, true, offset, whence, makeWatcher)
}

func runFileTailer(path string, readall bool, failOnMissingFile bool, logger simpleLogger, makeWatcher func(string, *File) (Watcher, error)) Tailer {
func runFileTailer(path string, failOnMissingFile bool, logger simpleLogger, seek bool, seekOffset int64, seekWhence int, makeWatcher func(string, *File) (Watcher, error)) Tailer {
if logger == nil {
logger = &nilLogger{}
}
Expand All @@ -71,7 +92,7 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
closed: false,
}

file, abspath, err := openLogfile(path, readall, failOnMissingFile) // file may be nil if failOnMissingFile is false and the file doesn't exist yet.
file, abspath, err := openLogfile(path, failOnMissingFile, seek, seekOffset, seekWhence) // file may be nil if failOnMissingFile is false and the file doesn't exist yet.
if err != nil {
go func(err error) {
writeError(errors, done, err, "failed to initialize file system watcher for %v", path)
Expand Down Expand Up @@ -173,7 +194,7 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
}

// may return *File == nil if the file does not exist and failOnMissingFile == false
func openLogfile(path string, readall bool, failOnMissingFile bool) (*File, string, error) {
func openLogfile(path string, failOnMissingFile bool, seek bool, seekOffset int64, seekWhence int) (*File, string, error) {
abspath, err := filepath.Abs(path)
if err != nil {
return nil, "", err
Expand All @@ -182,8 +203,8 @@ func openLogfile(path string, readall bool, failOnMissingFile bool) (*File, stri
if err != nil && (failOnMissingFile || !os.IsNotExist(err)) {
return nil, "", err
}
if !readall && file != nil {
_, err = file.Seek(0, io.SeekEnd)
if seek && file != nil {
_, err = file.Seek(seekOffset, seekWhence)
if err != nil {
if file != nil {
file.Close()
Expand Down
29 changes: 29 additions & 0 deletions tailer/fileTailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tailer

import (
"fmt"
"io"
"io/ioutil"
"os"
"os/user"
Expand Down Expand Up @@ -211,6 +212,34 @@ func TestFileMissingOnStartup(t *testing.T) {
expect(t, log, tail.Lines(), "test line 2", 1*time.Second)
}

func TestFileSeek(t *testing.T) {
const logfileName = "grok_exporter_seek_logfile.log"
log := NewTestRunLogger(400)
tmpDir := mkTmpDirOrFail(t)
defer cleanUp(t, tmpDir)
var logfile = fmt.Sprintf("%s%c%s", tmpDir, os.PathSeparator, logfileName)

logFileWriter := newLogFileWriter(t, logfile, closeFileAfterEachLine)
logFileWriter.writeLine(t, log, "test line 1")
logFileWriter.writeLine(t, log, "test line 2")

tail := RunFseventFileTailerWithSeek(logfile, true, log, 12, io.SeekStart)
defer tail.Close()

// We don't expect errors. However, start a go-routine listening on
// the tailer's errorChannel in case something goes wrong.
go func() {
for err := range tail.Errors() {
t.Errorf("Tailer failed: %v", err.Error()) // Cannot call t.Fatalf() in other goroutine.
}
}()

expect(t, log, tail.Lines(), "test line 2", 1*time.Second)

logFileWriter.writeLine(t, log, "test line 3")
expect(t, log, tail.Lines(), "test line 3", 1*time.Second)
}

func TestShutdownDuringSyscall(t *testing.T) {
runTestShutdown(t, "shutdown while the watcher is hanging in the blocking kevent() or syscall.Read() call")
}
Expand Down