Skip to content

Commit

Permalink
kafka replay speed: add alert for when we miss records in Kafka (graf…
Browse files Browse the repository at this point in the history
…ana#9921)

* kafka replay speed: don't trim fetchWants

I realized that trimming `fetchWant`s can end up discarding offsets in extreme circumstances.

### How it works

If the fetchWant is so big that its size would exceed 2GiB, then we trim it. We trim it by reducing the end offset. The idea is that the next fetchWant will pick up from where this one left off.

### How it can break

We trim the `fetchWant` in `UpdateBytesPerRecord` too. `UpdateBytesPerRecord` can be invoked in `concurrentFEtchers.run` after the `fetchWant` is dispatched. In that case the next `fetchWant` would have already been calculated. And we would end up with a gap.

### Did it break?

It's hard to tell, but it's very unlikely. To reach 2GiB we would have needed to have the estimation for bytes per record be 2 MiB. While these large records are possible, they should be rare and our rolling average estimation for records size shouldn't reach it.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* kafka replay speed: add alert for when we miss records in Kafka

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Restore local config

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Assert there are no missed records at the end of every test

Signed-off-by: Dimitar Dimitrov <[email protected]>

* make doc

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix rebase

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add support for gaps within a Fetch

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Reword runbook

Co-authored-by: Taylor C <[email protected]>

* Update log fields

Co-authored-by: Marco Pracucci <[email protected]>

* Update docs/sources/mimir/manage/mimir-runbooks/_index.md

* Add TestFindGapsInRecords

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Taylor C <[email protected]>
Co-authored-by: Marco Pracucci <[email protected]>
  • Loading branch information
3 people authored Nov 19, 2024
1 parent da190b5 commit dc3ddfa
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 11 deletions.
2 changes: 0 additions & 2 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ ingest_storage:
topic: mimir-ingest
last_produced_offset_poll_interval: 500ms
startup_fetch_concurrency: 15
startup_records_per_fetch: 2400
ongoing_fetch_concurrency: 2
ongoing_records_per_fetch: 30

ingester:
track_ingester_owned_series: true
Expand Down
4 changes: 2 additions & 2 deletions development/mimir-ingest-storage/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ std.manifestYamlDoc({
'-ingester.ring.prefix=exclusive-prefix',
'-ingest-storage.kafka.consume-from-position-at-startup=end',
'-ingest-storage.kafka.consume-from-timestamp-at-startup=0',
'-ingest-storage.kafka.ingestion-concurrency=2',
'-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
'-ingest-storage.kafka.startup-fetch-concurrency=15',
'-ingest-storage.kafka.ongoing-fetch-concurrency=2',
'-ingest-storage.kafka.ingestion-concurrency-max=2',
'-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
],
extraVolumes: ['.data-mimir-write-zone-c-61:/data:delegated'],
}),
Expand Down
2 changes: 1 addition & 1 deletion development/mimir-ingest-storage/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@
"command":
- "sh"
- "-c"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-max=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150"
"depends_on":
"kafka_1":
"condition": "service_healthy"
Expand Down
19 changes: 19 additions & 0 deletions docs/sources/mimir/manage/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,25 @@ How to **investigate**:
- If the call exists and it's waiting on a lock then there may be a deadlock.
- If the call doesn't exist then it could either mean processing is not stuck (false positive) or the `pushToStorage` wasn't called at all, and so you should investigate the callers in the code.
### MimirIngesterMissedRecordsFromKafka
This alert fires when an ingester has missed processing some records from Kafka. In other words, there has been a gap in offsets.
How it **works**:
- The ingester reads records from Kafka and processes them sequentially. It keeps track of the offset of the last record it's processed.
- Upon fetching the next batch of records, it checks if the first available record has an offset of one greater than the last processed offset. If the first available offset is larger than that, then the ingester has missed some records.
- Kafka doesn't guarantee sequential offsets. If a record has been manually deleted from Kafka or if the records have been produced in a transaction and the transaction was aborted, then there may be a gap.
- Mimir doesn't produce in transactions and does not delete records.
- When the ingester starts, it attempts to resume from the last offset it processed. If the ingester has been unavailable for long enough that the next record is already removed due to retention, then the ingester misses some records.
How to **investigate**:
- Find the offsets which were missed. The ingester logs them along with the message `there is a gap in consumed offsets`.
- Verify that there have been no deleted records in your Kafka cluster.
- Verify that the ingester hasn't been down for longer than the retention on the Kafka partition.
- Report a bug.
### MimirStrongConsistencyEnforcementFailed
This alert fires when too many read requests with strong consistency are failing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,15 @@ spec:
for: 5m
labels:
severity: critical
- alert: MimirIngesterMissedRecordsFromKafka
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
expr: |
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
labels:
severity: critical
- alert: MimirStrongConsistencyEnforcementFailed
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
Expand Down
9 changes: 9 additions & 0 deletions operations/mimir-mixin-compiled-baremetal/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,15 @@ groups:
for: 5m
labels:
severity: critical
- alert: MimirIngesterMissedRecordsFromKafka
annotations:
message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
expr: |
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
labels:
severity: critical
- alert: MimirStrongConsistencyEnforcementFailed
annotations:
message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
Expand Down
9 changes: 9 additions & 0 deletions operations/mimir-mixin-compiled/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,15 @@ groups:
for: 5m
labels:
severity: critical
- alert: MimirIngesterMissedRecordsFromKafka
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
expr: |
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
labels:
severity: critical
- alert: MimirStrongConsistencyEnforcementFailed
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
Expand Down
15 changes: 15 additions & 0 deletions operations/mimir-mixin/alerts/ingest-storage.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@
},
},

// Alert firing is an ingester is reading from Kafka, there are buffered records to process, but processing is stuck.
{
alert: $.alertName('IngesterMissedRecordsFromKafka'),
expr: |||
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[%s]) > 0
||| % $.alertRangeInterval(10),
labels: {
severity: 'critical',
},
annotations: {
message: '%(product)s {{ $labels.%(per_instance_label)s }} in %(alert_aggregation_variables)s missed processing records from Kafka. There may be data loss.' % $._config,
},
},

// Alert firing if Mimir is failing to enforce strong read consistency.
{
alert: $.alertName('StrongConsistencyEnforcementFailed'),
Expand Down
49 changes: 43 additions & 6 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -227,7 +228,7 @@ type concurrentFetchers struct {
// ordering.
orderedFetches chan fetchResult

lastReturnedRecord int64
lastReturnedOffset int64
startOffsets *genericOffsetReader[int64]

// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
Expand Down Expand Up @@ -283,7 +284,7 @@ func newConcurrentFetchers(
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
lastReturnedOffset: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
maxBufferedBytesLimit: maxBufferedBytesLimit,
Expand Down Expand Up @@ -343,7 +344,7 @@ func (r *concurrentFetchers) Stop() {
r.bufferedFetchedRecords.Store(0)
r.bufferedFetchedBytes.Store(0)

level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_offset", r.lastReturnedOffset)
}

// Update implements fetcher
Expand All @@ -352,7 +353,7 @@ func (r *concurrentFetchers) Update(ctx context.Context, concurrency int) {
r.done = make(chan struct{})

r.wg.Add(1)
go r.start(ctx, r.lastReturnedRecord+1, concurrency)
go r.start(ctx, r.lastReturnedOffset+1, concurrency)
}

// PollFetches implements fetcher
Expand All @@ -369,12 +370,13 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
// PollFetches() calls).
r.bufferedFetchedRecords.Sub(int64(len(f.Records)))

firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord)
firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedOffset)
r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime)

f.Records = f.Records[firstUnreturnedRecordIdx:]
if len(f.Records) > 0 {
r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset
instrumentGaps(findGapsInRecords(f.Records, r.lastReturnedOffset), r.metrics.missedRecords, r.logger)
r.lastReturnedOffset = f.Records[len(f.Records)-1].Offset
}

return kgo.Fetches{{
Expand All @@ -388,6 +390,41 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
}
}

func instrumentGaps(gaps []offsetRange, records prometheus.Counter, logger log.Logger) {
for _, gap := range gaps {
level.Error(logger).Log(
"msg", "there is a gap in consumed offsets; it is likely that there was data loss; see runbook for MimirIngesterMissedRecordsFromKafka",
"records_offset_gap_start_inclusive", gap.start,
"records_offset_gap_end_exclusive", gap.end,
)
records.Add(float64(gap.numOffsets()))
level.Error(logger).Log("msg", "found gap in records", "start", gap.start, "end", gap.end)
}
}

type offsetRange struct {
// start is inclusive
start int64

// end is exclusive
end int64
}

func (g offsetRange) numOffsets() int64 {
return g.end - g.start
}

func findGapsInRecords(records []*kgo.Record, lastReturnedOffset int64) []offsetRange {
var gaps []offsetRange
for _, r := range records {
if r.Offset != lastReturnedOffset+1 {
gaps = append(gaps, offsetRange{start: lastReturnedOffset + 1, end: r.Offset})
}
lastReturnedOffset = r.Offset
}
return gaps
}

func recordIndexAfterOffset(records []*kgo.Record, offset int64) int {
for i, r := range records {
if r.Offset > offset {
Expand Down
86 changes: 86 additions & 0 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math"
"strings"
"sync"
"testing"
"time"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kerr"
Expand Down Expand Up @@ -1149,6 +1151,15 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli
reg := prometheus.NewPedanticRegistry()
metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{})

t.Cleanup(func() {
// Assuming none of the tests intentionally create gaps in offsets, there should be no missed records.
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_missed_records_total The number of offsets that were never consumed by the reader because they weren't fetched.
# TYPE cortex_ingest_storage_reader_missed_records_total counter
cortex_ingest_storage_reader_missed_records_total 0
`), "cortex_ingest_storage_reader_missed_records_total"))
})

// This instantiates the fields of kprom.
// This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves.
metrics.kprom.OnNewClient(client)
Expand Down Expand Up @@ -1322,3 +1333,78 @@ func TestFetchWant_UpdateBytesPerRecord(t *testing.T) {
})
}
}

func TestFindGapsInRecords(t *testing.T) {
tests := map[string]struct {
records []*kgo.Record
lastReturnedOffset int64
want []offsetRange
}{
"no gaps": {
records: []*kgo.Record{
{Offset: 1},
{Offset: 2},
{Offset: 3},
},
lastReturnedOffset: 0,
want: nil,
},
"single gap": {
records: []*kgo.Record{
{Offset: 5},
},
lastReturnedOffset: 2,
want: []offsetRange{
{start: 3, end: 5},
},
},
"multiple gaps": {
records: []*kgo.Record{
{Offset: 3},
{Offset: 7},
{Offset: 10},
},
lastReturnedOffset: 1,
want: []offsetRange{
{start: 2, end: 3},
{start: 4, end: 7},
{start: 8, end: 10},
},
},
"empty records": {
records: []*kgo.Record{},
lastReturnedOffset: 5,
want: nil,
},
"gap at start": {
records: []*kgo.Record{
{Offset: 10},
{Offset: 11},
},
lastReturnedOffset: 5,
want: []offsetRange{
{start: 6, end: 10},
},
},
"gap at start and middle": {
records: []*kgo.Record{
{Offset: 10},
{Offset: 11},
{Offset: 15},
{Offset: 16},
},
lastReturnedOffset: 5,
want: []offsetRange{
{start: 6, end: 10},
{start: 12, end: 15},
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
got := findGapsInRecords(tc.records, tc.lastReturnedOffset)
assert.Equal(t, tc.want, got)
})
}
}
5 changes: 5 additions & 0 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@ type readerMetrics struct {
lastConsumedOffset prometheus.Gauge
consumeLatency prometheus.Histogram
kprom *kprom.Metrics
missedRecords prometheus.Counter
}

type readerMetricsSource interface {
Expand Down Expand Up @@ -1083,6 +1084,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg),
lastConsumedOffset: lastConsumedOffset,
kprom: NewKafkaReaderClientMetrics(component, reg),
missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_missed_records_total",
Help: "The number of offsets that were never consumed by the reader because they weren't fetched.",
}),
}

m.Service = services.NewTimerService(100*time.Millisecond, nil, func(context.Context) error {
Expand Down

0 comments on commit dc3ddfa

Please sign in to comment.