Skip to content

Commit

Permalink
analyzer: Fail healtcheck after some time without messages (#50)
Browse files Browse the repository at this point in the history
* core: Add EventFlowSilenceTolerance config

* healthcore: Check last event time on healthcheck

* analyzer: Update description to match new behavior

* healthcore: Use event timestamp instead of walltime

This will also make the rollout of analyzer instances
smoother, as they stay unhealthy until they get recent-enough
events that pass the tolerance test.
  • Loading branch information
victorges authored Jun 1, 2022
1 parent 8ad1c8b commit 4f6696f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
1 change: 1 addition & 0 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func parseFlags(version string) cliFlags {
fs.StringVar(&cli.streamingOpts.MaxLengthBytes, "stream-max-length", "50gb", "When creating a new stream, config for max total storage size")
fs.StringVar(&cli.streamingOpts.MaxSegmentSizeBytes, "stream-max-segment-size", "500mb", "When creating a new stream, config for max stream segment size in storage")
fs.DurationVar(&cli.streamingOpts.MaxAge, "stream-max-age", 30*24*time.Hour, `When creating a new stream, config for max age of stored events`)
fs.DurationVar(&cli.streamingOpts.EventFlowSilenceTolerance, "event-flow-silence-tolerance", 10*time.Minute, "The time to tolerate getting zero messages in the stream before giving an error on the service healthcheck")
fs.DurationVar(&cli.memoryRecordsTtl, "memory-records-ttl", 24*time.Hour, `How long to keep data records in memory about inactive streams`)

flag.Set("logtostderr", "true")
Expand Down
16 changes: 14 additions & 2 deletions health/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type StreamingOptions struct {
Stream, ConsumerName string

event.RawStreamOptions

// EventFlowSilenceTolerance determines the amount of time to tolerate zero
// messages in the stream before giving an error on the service healthcheck.
// This is a workaround for a bug in the rabbitmq streams client in which it
// freezes when the stream has leader election issues in a clustered setup.
EventFlowSilenceTolerance time.Duration
}

type CoreOptions struct {
Expand All @@ -68,7 +74,8 @@ type Core struct {
reducer Reducer
conditionTypes []data.ConditionType

storage RecordStorage
storage RecordStorage
lastEventTs time.Time
}

func NewCore(opts CoreOptions, consumer event.StreamConsumer, reducer Reducer) *Core {
Expand All @@ -83,7 +90,11 @@ func NewCore(opts CoreOptions, consumer event.StreamConsumer, reducer Reducer) *
func (c *Core) IsHealthy() bool {
err := c.consumer.CheckConnection()
if err != nil {
glog.Warningf("Health core is unhealthy. consumerErr=%q", err)
glog.Warningf("Health core is unhealthy. reason=consumerErr consumerErr=%q", err)
return false
}
if tol := c.opts.Streaming.EventFlowSilenceTolerance; tol > 0 && time.Since(c.lastEventTs) > tol {
glog.Warningf("Health core is unhealthy. reason=noEvents lastEventTs=%s, tolerance=%s", c.lastEventTs, tol)
return false
}
return true
Expand Down Expand Up @@ -136,6 +147,7 @@ func (c *Core) HandleMessage(msg event.StreamMessage) {

func (c *Core) handleSingleEvent(evt data.Event) {
streamID, ts := evt.StreamID(), evt.Timestamp()
c.lastEventTs = ts
record := c.storage.GetOrCreate(streamID, c.conditionTypes)

record.RLock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/event/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type (
Stream string
*StreamOptions
*stream.ConsumerOptions
// Whether to memorize the message offset in the stream and use it on
// re-connections to continue from the last read message.
MemorizeOffset bool
}

Expand Down

0 comments on commit 4f6696f

Please sign in to comment.