Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(metastore): Update metastores after Flushing a dataobj #15883

Merged
merged 34 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1c694ba
chore(dataobj): reimplement logs section sorting
rfratto Jan 20, 2025
b4278d6
Merge branch 'dataobj-compression-ratio-and-final-size' into dataobj-…
rfratto Jan 20, 2025
9fd28d1
Merge branch 'dataobj-compression-ratio-and-final-size' into dataobj-…
rfratto Jan 20, 2025
223df08
Merge branch 'dataobj-logs-sort' into dataobj-consumer
rfratto Jan 20, 2025
3769350
Revert "Merge branch 'dataobj-logs-sort' into dataobj-consumer"
rfratto Jan 20, 2025
e68af68
Merge branch 'main' into dataobj-consumer
rfratto Jan 21, 2025
a643a04
chore(dataobj): reintroduce sorting [wip]
rfratto Jan 21, 2025
7997dcb
Merge branch 'dataobj-log-batches' into dataobj-consumer
rfratto Jan 21, 2025
13b6404
chore(dataobj): pool zstd readers
rfratto Jan 21, 2025
684010d
chore(dataobj): compress intermediate stripes faster
rfratto Jan 21, 2025
af79870
chore(dataobj): pass compression options
rfratto Jan 21, 2025
3bfef7d
chore(dataobj): stop mergeStripes iter on return
rfratto Jan 22, 2025
aad0c8e
chore(datobj): stop pull iter on defer in dataset.Iter
rfratto Jan 22, 2025
d3f7d90
chore(dataobj): fix memory leak in slice usage
rfratto Jan 22, 2025
0ad0d5e
chore(dataobj): provide package for bucketed buffer pools
rfratto Jan 22, 2025
ecb5d86
Build and update metastore objects on flush
cyriltovena Jan 16, 2025
3fe3fd4
Sort labels when recreating Builder
benclive Jan 22, 2025
226f515
Instrument & improve interface
benclive Jan 22, 2025
b1825d7
method names and error cases
benclive Jan 22, 2025
1800951
naming
benclive Jan 22, 2025
3e59b90
Optimize metastore builder: Reuse builder, efficiently pass labels, r…
benclive Jan 23, 2025
16c29be
Update to latest objstore fork
benclive Jan 23, 2025
d1c8ee2
Set correct size of buffers
benclive Jan 24, 2025
82c2c31
Merge branch 'dataobj-consumer' into dataobj-comsumer-metastore
benclive Jan 24, 2025
1b4633a
cleanup
benclive Jan 24, 2025
b978510
cleanup merge artifacts
benclive Jan 24, 2025
8188ee7
Refactor
benclive Jan 24, 2025
e49df95
Adjust memory usage
benclive Jan 24, 2025
d3eaabb
syntax
benclive Jan 24, 2025
ee473a0
Lazy initialise the builder
benclive Jan 24, 2025
0b5be06
Fix init bug
benclive Jan 24, 2025
80ee08e
Only store dataobj min/max timestamp & path in metastore
benclive Jan 27, 2025
abcf9c7
adjust optimizations
benclive Jan 27, 2025
a829763
remove unused func
benclive Jan 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,5 @@ replace github.com/grafana/loki/pkg/push => ./pkg/push

// leodido fork his project to continue support
replace github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0

replace github.com/thanos-io/objstore => github.com/benclive/objstore v0.0.0-20250122170312-3f71d73c03dd
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ github.com/baidubce/bce-sdk-go v0.9.214 h1:bsVfwMh/emI6vreEveUEq9xAr6xtHLycTAGy2
github.com/baidubce/bce-sdk-go v0.9.214/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0=
github.com/benclive/objstore v0.0.0-20250122170312-3f71d73c03dd h1:EtsJOLTb14nQwPSfVDj6b101VhSj25EyUEW9mwJ4X9M=
github.com/benclive/objstore v0.0.0-20250122170312-3f71d73c03dd/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -1124,8 +1126,6 @@ github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08Yu
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a h1:wFBHAmtq1tOLPFaiC4LozyG/BzkRa3ZTmVv1KujUNqk=
github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
Expand Down
50 changes: 32 additions & 18 deletions pkg/dataobj/consumer/partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/kafka"
)

Expand All @@ -30,10 +31,10 @@
decoder *kafka.Decoder

// Builder initialization
builderOnce sync.Once
builderCfg dataobj.BuilderConfig
bucket objstore.Bucket

builderOnce sync.Once
builderCfg dataobj.BuilderConfig
bucket objstore.Bucket
metastoreManager *metastore.MetastoreManager
// Metrics
metrics *partitionOffsetMetrics

Expand All @@ -60,20 +61,27 @@
level.Error(logger).Log("msg", "failed to register partition metrics", "err", err)
}

metastoreManager, err := metastore.NewMetastoreManager(bucket, tenantID, logger, reg)
if err != nil {
level.Error(logger).Log("msg", "failed to create metastore manager", "err", err)
return nil

Check failure on line 67 in pkg/dataobj/consumer/partition_processor.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

lostcancel: this return statement may be reached without using the cancel var defined on line 50 (govet)
}

return &partitionProcessor{
client: client,
logger: log.With(logger, "topic", topic, "partition", partition),
topic: topic,
partition: partition,
records: make(chan *kgo.Record, 1000),
ctx: ctx,
cancel: cancel,
decoder: decoder,
reg: reg,
builderCfg: builderCfg,
bucket: bucket,
tenantID: []byte(tenantID),
metrics: metrics,
client: client,
logger: log.With(logger, "topic", topic, "partition", partition),
topic: topic,
partition: partition,
records: make(chan *kgo.Record, 1000),
ctx: ctx,
cancel: cancel,
decoder: decoder,
reg: reg,
builderCfg: builderCfg,
bucket: bucket,
tenantID: []byte(tenantID),
metrics: metrics,
metastoreManager: metastoreManager,
}
}

Expand Down Expand Up @@ -157,8 +165,9 @@
MaxBackoff: 10 * time.Second,
})

var flushResult dataobj.FlushResult
for backoff.Ongoing() {
err = p.builder.Flush(p.ctx)
flushResult, err = p.builder.Flush(p.ctx)
if err == nil {
break
}
Expand All @@ -167,6 +176,11 @@
backoff.Wait()
}

if err := p.metastoreManager.UpdateMetastore(p.ctx, flushResult); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
return
}

backoff.Reset()
for backoff.Ongoing() {
err = p.client.CommitRecords(p.ctx, record)
Expand Down
80 changes: 69 additions & 11 deletions pkg/dataobj/dataobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"errors"
"flag"
"fmt"
"io"
"sort"
"time"

"github.com/grafana/dskit/flagext"
lru "github.com/hashicorp/golang-lru/v2"
Expand Down Expand Up @@ -138,6 +141,11 @@ const (
builderStateFlush
)

type FlushResult struct {
Path string
MinTimestamp, MaxTimestamp time.Time
}

// NewBuilder creates a new Builder which stores data objects for the specified
// tenant in a bucket.
//
Expand Down Expand Up @@ -180,6 +188,39 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu
}, nil
}

// FromExisting updates this builder with content from an existing data object, replicating all the state like stream IDs and logs.
func (b *Builder) FromExisting(f io.ReadSeeker) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a massive fan of this method, and I think it could be made more efficient. I want to test it out before committing to any improvements here though.

if b.currentSizeEstimate > 0 {
return fmt.Errorf("builder already has data, cannot use FromExisting")
}

dec := encoding.ReadSeekerDecoder(f)

var streamIDs = make(map[int64]*labels.Labels, 32)
for result := range streams.Iter(context.Background(), dec) {
stream, err := result.Value()
if err != nil {
return err
}
sort.Sort(stream.Labels)
streamIDs[stream.ID] = &stream.Labels
}

for result := range logs.Iter(context.Background(), dec) {
record, err := result.Value()
if err != nil {
return err
}
streamLabels := streamIDs[record.StreamID]

b.streams.Record(*streamLabels, record.Timestamp)
b.logs.Append(record)
}

b.state = builderStateDirty
return nil
}

// Append buffers a stream to be written to a data object. Append returns an
// error if the stream labels cannot be parsed or [ErrBufferFull] if the
// builder is full.
Expand Down Expand Up @@ -286,15 +327,10 @@ func streamSizeEstimate(stream logproto.Stream) int {
// If Flush builds an object but fails to upload it to object storage, the
// built object is cached and can be retried. [Builder.Reset] can be called to
// discard any pending data and allow new data to be appended.
func (b *Builder) Flush(ctx context.Context) error {
switch b.state {
case builderStateEmpty:
return nil // Nothing to flush
case builderStateDirty:
if err := b.buildObject(); err != nil {
return fmt.Errorf("building object: %w", err)
}
b.state = builderStateFlush
func (b *Builder) Flush(ctx context.Context) (FlushResult, error) {
_, err := b.FlushToBuffer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, this changes the behaviour of Flush where calling Flush immediately after a successful flush will cause it to re-write the same object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved Reset back into this method now I'm returning the FlushResult summary. Does that fix the issue?

if err != nil {
return FlushResult{}, err
}

timer := prometheus.NewTimer(b.metrics.flushTime)
Expand All @@ -305,11 +341,32 @@ func (b *Builder) Flush(ctx context.Context) error {

objectPath := fmt.Sprintf("tenant-%s/objects/%s/%s", b.tenantID, sumStr[:b.cfg.SHAPrefixSize], sumStr[b.cfg.SHAPrefixSize:])
if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(b.flushBuffer.Bytes())); err != nil {
return err
return FlushResult{}, fmt.Errorf("uploading object: %w", err)
}

minTimestamp, maxTimestamp := b.streams.GetBounds()

b.Reset()
return nil

return FlushResult{
Path: objectPath,
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
}, nil
}

func (b *Builder) FlushToBuffer() (*bytes.Buffer, error) {
switch b.state {
case builderStateEmpty:
return nil, nil // Nothing to flush
case builderStateDirty:
if err := b.buildObject(); err != nil {
return nil, fmt.Errorf("building object: %w", err)
}
b.state = builderStateFlush
}

return b.flushBuffer, nil
}

func (b *Builder) buildObject() error {
Expand Down Expand Up @@ -345,6 +402,7 @@ func (b *Builder) Reset() {
b.state = builderStateEmpty
b.flushBuffer.Reset()
b.metrics.sizeEstimate.Set(0)
b.currentSizeEstimate = 0
}

// RegisterMetrics registers metrics about builder to report to reg. All
Expand Down
3 changes: 2 additions & 1 deletion pkg/dataobj/dataobj_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func Test(t *testing.T) {
for _, entry := range streams {
require.NoError(t, builder.Append(entry))
}
require.NoError(t, builder.Flush(context.Background()))
_, err = builder.Flush(context.Background())
require.NoError(t, err)
})

t.Run("Read", func(t *testing.T) {
Expand Down
45 changes: 42 additions & 3 deletions pkg/dataobj/internal/sections/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
package streams

import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -16,6 +18,7 @@
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
)
Expand All @@ -33,6 +36,20 @@
Rows int // Number of rows in the stream.
}

func (s *Stream) Reset() {
s.ID = 0
s.Labels = nil
s.MinTimestamp = time.Time{}
s.MaxTimestamp = time.Time{}
s.Rows = 0
}

var streamPool = sync.Pool{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pool added about 10% more ops in the benchmark when re-using dataobjs between metastores. In reality we won't be reusing Streams very often (once per flush), so it might not be worth keeping this pool as the Stream objects will likely be deallocated between runs.
WDYT?

New: func() interface{} {
return &Stream{}
},
}

// Streams tracks information about streams in a data object.
type Streams struct {
metrics *Metrics
Expand Down Expand Up @@ -61,10 +78,26 @@
return &Streams{
metrics: metrics,
pageSize: pageSize,
lookup: make(map[uint64][]*Stream),
lookup: make(map[uint64][]*Stream, 1024),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This yielded a 10-20% speed up over the baseline. We're not likely to have this many Streams in a metastore but it would be worth it for the logs dataobj, I think.

ordered: make([]*Stream, 0, 1024),
}
}

func (s *Streams) Iter(ctx context.Context) result.Seq[Stream] {

Check warning on line 86 in pkg/dataobj/internal/sections/streams/streams.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
return result.Iter(func(yield func(Stream) bool) error {
for _, stream := range s.ordered {
if !yield(*stream) {
return nil
}
}
return nil
})
}

func (s *Streams) GetBounds() (time.Time, time.Time) {
return s.globalMinTimestamp, s.globalMaxTimestamp
}

// Record a stream record within the Streams section. The provided timestamp is
// used to track the minimum and maximum timestamp of a stream. The number of
// calls to Record is used to track the number of rows for a stream.
Expand Down Expand Up @@ -153,7 +186,11 @@
s.currentLabelsSize += len(lbl.Value)
}

newStream := &Stream{ID: s.lastID.Add(1), Labels: streamLabels}
newStream := streamPool.Get().(*Stream)
newStream.Reset()
newStream.ID = s.lastID.Add(1)
newStream.Labels = streamLabels

s.lookup[hash] = append(s.lookup[hash], newStream)
s.ordered = append(s.ordered, newStream)
s.metrics.streamCount.Inc()
Expand Down Expand Up @@ -187,7 +224,6 @@
func (s *Streams) EncodeTo(enc *encoding.Encoder) error {
timer := prometheus.NewTimer(s.metrics.encodeSeconds)
defer timer.ObserveDuration()
defer s.Reset()

// TODO(rfratto): handle one section becoming too large. This can happen when
// the number of columns is very wide. There are two approaches to handle
Expand Down Expand Up @@ -333,6 +369,9 @@
// Reset resets all state, allowing Streams to be reused.
func (s *Streams) Reset() {
s.lastID.Store(0)
for _, stream := range s.ordered {
streamPool.Put(stream)
}
clear(s.lookup)
s.ordered = sliceclear.Clear(s.ordered)
s.currentLabelsSize = 0
Expand Down
Loading
Loading