-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(metastore): Update metastores after Flushing a dataobj #15883
Conversation
Reintroduce a more efficient sorting implementation for the logs section: * Log records across streams are accumulated into different sets of builders. * At encode time, each stream is sorted by timestamp, and then all streams are combined and encoded into a final data set. As the set of streams resets between encodes, column builders are pooled to easily reuse them after an encode.
@@ -181,6 +183,45 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu | |||
}, nil | |||
} | |||
|
|||
// FromExisting updates this builder with content from an existing data object, replicating all the state like stream IDs and logs. | |||
func (b *Builder) FromExisting(f io.ReadSeeker) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a massive fan of this method, and I think it could be made more efficient. I want to test it out before committing to any improvements here though.
772f4b1
to
1800951
Compare
ad68a80
to
4961d28
Compare
1800951
to
be09cfb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
I'm not sure about this approach long-term (data objects aren't very friendly to appends after they've already been written), but I don't want to block this moving forward.
I left a few small comments about package/API design. Not anything that needs to get addressed right this moment in the prototyping phase, though, but probably something that should be addressed prior to merging into main.
} | ||
b.state = builderStateFlush | ||
func (b *Builder) Flush(ctx context.Context) (string, error) { | ||
_, err := b.FlushToBuffer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, this changes the behaviour of Flush where calling Flush immediately after a successful flush will cause it to re-write the same object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved Reset back into this method now I'm returning the FlushResult summary. Does that fix the issue?
err = p.writeMetastores(backoff, dataobjPath) | ||
if err != nil { | ||
level.Error(p.logger).Log("msg", "failed to write metastores", "err", err) | ||
return | ||
} | ||
|
||
// Reset builder after flushing & storing in metastore | ||
p.builder.Reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we made a few changes where logic gets moved to the processor:
- Resetting the state after a flush
- Digging into the state of the builder before writing the metastore index
- Splitting out flushing to object storage and flushing to a buffer
I think we can allow the metastore builder to have access to everything it needs without changing any of the above by returning some kind of report/summary on a successful call to Flush:
package dataobj
// Flush flushes all buffered data to object storage. Calling Flush can result
// in a no-op if there is no buffered data to flush.
//
// If Flush builds an object but fails to upload it to object storage, the
// built object is cached and can be retried. [Builder.Reset] can be called to
// discard any pending data and allow new data to be appended.
//
// On a successful flush, a summary describing what was flushed is included.
func (b *Builder) Flush() (Summary, error)
// A Summary summarizes the data included in a flush from a [Builder].
type Summary struct {
ObjectPath string // Object storage path that was flushed to.
Streams []Stream // Streams included in the flush.
}
// A Stream is an individual stream within a data object.
type Stream struct {
// (Copy or subset of streams.Stream to avoid exposing internal API in an external package)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is a nice idea, thank you! This logic evolved from trying to figure out what I needed from the dataobj so I never stepped back to figure out a nicer way to organise it. I'll give this a go!
@@ -184,3 +202,78 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { | |||
} | |||
} | |||
} | |||
|
|||
func (p *partitionProcessor) writeMetastores(backoff *backoff.Backoff, dataobjPath string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have some kind of dataobj/metastore
package that's responsible for building and operating on metastores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
be09cfb
to
1b4633a
Compare
} | ||
b.state = builderStateFlush | ||
func (b *Builder) Flush(ctx context.Context) (string, error) { | ||
_, err := b.FlushToBuffer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved Reset back into this method now I'm returning the FlushResult summary. Does that fix the issue?
s.Rows = 0 | ||
} | ||
|
||
var streamPool = sync.Pool{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this pool added about 10% more ops in the benchmark when re-using dataobjs between metastores. In reality we won't be reusing Streams very often (once per flush), so it might not be worth keeping this pool as the Stream objects will likely be deallocated between runs.
WDYT?
@@ -61,10 +78,36 @@ func New(metrics *Metrics, pageSize int) *Streams { | |||
return &Streams{ | |||
metrics: metrics, | |||
pageSize: pageSize, | |||
lookup: make(map[uint64][]*Stream), | |||
lookup: make(map[uint64][]*Stream, 1024), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This yielded a 10-20% speed up over the baseline. We're not likely to have this many Streams in a metastore but it would be worth it for the logs dataobj, I think.
What this PR does / why we need it:
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR