Skip to content

Commit

Permalink
feat: remove max delay logic use alpha with avg
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Sep 26, 2024
1 parent c127b90 commit 88807cb
Showing 1 changed file with 55 additions and 28 deletions.
83 changes: 55 additions & 28 deletions node/pkg/checker/ping/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

const (
DefaultMaxDelay = 100 * time.Millisecond
DefaultAlpha = 0.3
DefaultThresholdFactor = 1.3
DefaultMaxFails = 2
DefaultBufferSize = 500
DefaultReconnectInterval = 2 * time.Second
Expand Down Expand Up @@ -40,19 +41,14 @@ type PingEndpoint struct {
}

type AppConfig struct {
MaxDelay time.Duration
Endpoints []string
BufferSize int
Endpoints []string
BufferSize int
Alpha float64
ThresholdFactor float64
}

type AppOption func(*AppConfig)

func WithMaxDelay(duration time.Duration) AppOption {
return func(c *AppConfig) {
c.MaxDelay = duration
}
}

func WithEndpoints(endpoints []string) AppOption {
return func(c *AppConfig) {
c.Endpoints = endpoints
Expand All @@ -65,24 +61,42 @@ func WithResultBuffer(size int) AppOption {
}
}

func WithAlpha(alpha float64) AppOption {
return func(c *AppConfig) {
c.Alpha = alpha
}
}

func WithThresholdFactor(factor float64) AppOption {
return func(c *AppConfig) {
c.ThresholdFactor = factor
}
}

type App struct {
MaxDelay time.Duration
Endpoints []PingEndpoint
FailCount map[string]int
ResultsBuffer chan PingResult
RTTAvg map[string]float64
Alpha float64 // smoothing factor for the EMA
ThresholdFactor float64
Endpoints []PingEndpoint
ResultsBuffer chan PingResult
FailCount map[string]int
}

func (pe *PingEndpoint) run() error {
return pe.Pinger.Run()
}

func New(opts ...AppOption) (*App, error) {
app := &App{}
app := &App{
RTTAvg: make(map[string]float64),
FailCount: make(map[string]int),
}

c := AppConfig{
MaxDelay: DefaultMaxDelay,
Endpoints: GlobalEndpoints,
BufferSize: DefaultBufferSize,
Endpoints: GlobalEndpoints,
BufferSize: DefaultBufferSize,
Alpha: DefaultAlpha,
ThresholdFactor: DefaultThresholdFactor,
}

for _, opt := range opts {
Expand All @@ -91,15 +105,19 @@ func New(opts ...AppOption) (*App, error) {

app.ResultsBuffer = make(chan PingResult, c.BufferSize)

withoutPrivileged, err := strconv.ParseBool(os.Getenv("WITHOUT_PING_PRIVILEGED"))
if err != nil {
withoutPrivileged = false
}

endpoints := []PingEndpoint{}
for _, endpoint := range c.Endpoints {
pinger, err := probing.NewPinger(endpoint)
if err != nil {
return nil, err
}

withoutPriviliged, err := strconv.ParseBool(os.Getenv("WITHOUT_PING_PRIVILEGED"))
if !withoutPriviliged || err != nil {
if !withoutPrivileged || err != nil {
pinger.SetPrivileged(true)
}

Expand All @@ -114,9 +132,9 @@ func New(opts ...AppOption) (*App, error) {
endpoints = append(endpoints, PingEndpoint{endpoint, pinger})
}

app.MaxDelay = c.MaxDelay
app.Endpoints = endpoints
app.FailCount = make(map[string]int)
app.Alpha = c.Alpha
app.ThresholdFactor = c.ThresholdFactor

return app, nil
}
Expand Down Expand Up @@ -148,21 +166,31 @@ func (app *App) Start(ctx context.Context) {
}
}
time.Sleep(DefaultReconnectInterval)
continue
}

}
}(endpoint)
}

for {
select {
case <-ctx.Done():
close(app.ResultsBuffer)
return
case result := <-app.ResultsBuffer:
if result.Success && result.Delay < app.MaxDelay {
log.Debug().Any("result", result).Msg("ping success")
app.FailCount[result.Address] = 0
if result.Success {
if _, exists := app.RTTAvg[result.Address]; !exists {
app.RTTAvg[result.Address] = float64(result.Delay)
} else {
app.RTTAvg[result.Address] = app.RTTAvg[result.Address]*app.Alpha + float64(result.Delay)*(1-app.Alpha)
}

if result.Delay > time.Duration(app.ThresholdFactor*app.RTTAvg[result.Address]) {
log.Error().Any("result", result).Msg("ping failed")
app.FailCount[result.Address] += 1
} else {
log.Debug().Any("result", result).Msg("ping success")
app.FailCount[result.Address] = 0
}
} else {
log.Error().Any("result", result).Msg("failed to ping endpoint")
app.FailCount[result.Address] += 1
Expand All @@ -182,5 +210,4 @@ func (app *App) Start(ctx context.Context) {
}
}
}

}

0 comments on commit 88807cb

Please sign in to comment.