Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ns/202 standard oklog format #210

Open
wants to merge 20 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions turbo/logging/async_buffered_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package logging

import (
"bufio"
"context"
"io"
"sync"
"time"

"github.com/ledgerwatch/log/v3"
)

const (
_BufferSize = 1 << 18 // 256 KiB
_FlushInterval = time.Second * 2
)

type AsyncBufferedWriter struct {
Size int
FlushInterval time.Duration
ctx context.Context

mu sync.Mutex

bufferedWriter *bufio.Writer
ticker *time.Ticker
done chan struct{}
stop chan struct{}

initialized bool
stopped bool
}

func AsyncHandler(wr io.Writer, format log.Format, ctx context.Context) log.Handler {
asyncBufferedWriter := &AsyncBufferedWriter{ctx: ctx}
asyncBufferedWriter.initialize(wr)

h := log.FuncHandler(func(r *log.Record) error {
_, err := asyncBufferedWriter.write(format.Format(r))
return err
})
return h
}

func (s *AsyncBufferedWriter) initialize(wr io.Writer) {
if s.initialized {
return
}

s.Size = _BufferSize
s.FlushInterval = _FlushInterval

s.ticker = time.NewTicker(s.FlushInterval)
s.bufferedWriter = bufio.NewWriterSize(wr, s.Size)

s.done = make(chan struct{})
s.stop = make(chan struct{})

s.initialized = true

go s.flushLoop()
}

func (s *AsyncBufferedWriter) write(b []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()

if len(b) >= s.bufferedWriter.Available() && s.bufferedWriter.Buffered() > 0 {
if err := s.bufferedWriter.Flush(); err != nil {
return 0, err
}
}

return s.bufferedWriter.Write(b)
}

func (s *AsyncBufferedWriter) flush() error {
s.mu.Lock()
defer s.mu.Unlock()

return s.bufferedWriter.Flush()
}

func (s *AsyncBufferedWriter) flushLoop() {
defer close(s.done)
for {
select {
case <-s.ticker.C:
_ = s.flush()
case <-s.stop:
return
case <-s.ctx.Done():
s.Stop()
}
}
}

func (s *AsyncBufferedWriter) Stop() (err error) {
var stopped bool
func() {
s.mu.Lock()
defer s.mu.Unlock()

if !s.initialized {
return
}
stopped = s.stopped

if stopped {
return
}

s.stopped = true

s.ticker.Stop()
close(s.stop)
<-s.done
}()

if !stopped {
err = s.bufferedWriter.Flush()
}
return err
}
6 changes: 6 additions & 0 deletions turbo/logging/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ var (
Name: "log.delays",
Usage: "Enable block delay logging",
}

LogAsyncFlag = cli.BoolFlag{
Name: "log.async",
Usage: "Enable async logging",
}
)

var Flags = []cli.Flag{
Expand All @@ -69,4 +74,5 @@ var Flags = []cli.Flag{
&LogDirPrefixFlag,
&LogDirVerbosityFlag,
&LogBlockDelayFlag,
&LogAsyncFlag,
}
41 changes: 30 additions & 11 deletions turbo/logging/logging.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package logging

import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"strconv"

"github.com/ledgerwatch/erigon-lib/common/metrics"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This format of code should be adjusted.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I changed the ordering of imports

"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"github.com/urfave/cli/v2"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/ledgerwatch/erigon-lib/common/metrics"
)

// Determine the log dir path based on the given urfave context
Expand Down Expand Up @@ -41,6 +42,7 @@ func SetupLoggerCtx(filePrefix string, ctx *cli.Context,
consoleDefaultLevel log.Lvl, dirDefaultLevel log.Lvl, rootHandler bool) log.Logger {
var consoleJson = ctx.Bool(LogJsonFlag.Name) || ctx.Bool(LogConsoleJsonFlag.Name)
var dirJson = ctx.Bool(LogDirJsonFlag.Name)
var asyncLogging = ctx.Bool(LogAsyncFlag.Name)

metrics.DelayLoggingEnabled = ctx.Bool(LogBlockDelayFlag.Name)

Expand Down Expand Up @@ -79,7 +81,7 @@ func SetupLoggerCtx(filePrefix string, ctx *cli.Context,
logger = log.New()
}

initSeparatedLogging(logger, filePrefix, dirPath, consoleLevel, dirLevel, consoleJson, dirJson)
initSeparatedLogging(logger, filePrefix, dirPath, consoleLevel, dirLevel, consoleJson, dirJson, asyncLogging, ctx.Context)
return logger
}

Expand Down Expand Up @@ -141,7 +143,7 @@ func SetupLoggerCmd(filePrefix string, cmd *cobra.Command) log.Logger {
}
}

initSeparatedLogging(log.Root(), filePrefix, dirPath, consoleLevel, dirLevel, consoleJson, dirJson)
initSeparatedLogging(log.Root(), filePrefix, dirPath, consoleLevel, dirLevel, consoleJson, dirJson, false, cmd.Context())
return log.Root()
}

Expand Down Expand Up @@ -179,7 +181,7 @@ func SetupLogger(filePrefix string) log.Logger {
filePrefix = *logDirPrefix
}

initSeparatedLogging(log.Root(), filePrefix, *logDirPath, consoleLevel, dirLevel, consoleJson, *dirJson)
initSeparatedLogging(log.Root(), filePrefix, *logDirPath, consoleLevel, dirLevel, consoleJson, *dirJson, false, context.Background())
return log.Root()
}

Expand All @@ -193,15 +195,27 @@ func initSeparatedLogging(
consoleLevel log.Lvl,
dirLevel log.Lvl,
consoleJson bool,
dirJson bool) {
dirJson bool,
asyncLogging bool,
ctx context.Context) {

var consoleHandler log.Handler

var format log.Format

okLogFormatFunc := log.FormatFunc(OkLogV1Format)

if consoleJson {
consoleHandler = log.LvlFilterHandler(consoleLevel, log.StreamHandler(os.Stderr, log.JsonFormat()))
format = okLogFormatFunc
} else {
consoleHandler = log.LvlFilterHandler(consoleLevel, log.StderrHandler)
format = log.TerminalFormatNoColor()
}
if asyncLogging {
consoleHandler = log.LvlFilterHandler(consoleLevel, AsyncHandler(os.Stderr, format, ctx))
} else {
consoleHandler = log.LvlFilterHandler(consoleLevel, log.StreamHandler(os.Stderr, format))
}

logger.SetHandler(consoleHandler)

if len(dirPath) == 0 {
Expand All @@ -214,10 +228,9 @@ func initSeparatedLogging(
logger.Warn("failed to create log dir, console logging only")
return
}

dirFormat := log.TerminalFormatNoColor()
if dirJson {
dirFormat = log.JsonFormat()
dirFormat = okLogFormatFunc
}

lumberjack := &lumberjack.Logger{
Expand All @@ -226,11 +239,17 @@ func initSeparatedLogging(
MaxBackups: 3,
MaxAge: 28, //days
}
userLog := log.StreamHandler(lumberjack, dirFormat)
var userLog log.Handler
if asyncLogging {
userLog = AsyncHandler(lumberjack, dirFormat, ctx)
} else {
userLog = log.StreamHandler(lumberjack, dirFormat)
}

mux := log.MultiHandler(consoleHandler, log.LvlFilterHandler(dirLevel, userLog))
logger.SetHandler(mux)
logger.Info("logging to file system", "log dir", dirPath, "file prefix", filePrefix, "log level", dirLevel, "json", dirJson)
logger.Info(fmt.Sprintf("Async logging enabled: %v", asyncLogging))
}

func tryGetLogLevel(s string) (log.Lvl, error) {
Expand Down
Loading
Loading