Skip to content

Commit

Permalink
Use different segment sizes per op. (#147)
Browse files Browse the repository at this point in the history
In extreme cases where operation types have big difference in execution speed, for instance 
2 hours PUT and 20 seconds DELETE the automatic segment duration would select a value 
unsuitable for the slowest.

In this case a segment size of 15s would be chosen and there would be no valuable information 
in the DELETE segment.

Use per type segment selection.

Bonus, calculate mixed endpoints concurrently.
  • Loading branch information
klauspost authored Nov 13, 2020
1 parent 4a62e4c commit 57d6ddc
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 36 deletions.
10 changes: 7 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Server struct {
status BenchmarkStatus
ops bench.Operations
agrr *aggregate.Aggregated
aggrDur time.Duration
server *http.Server
cmdLine string

Expand Down Expand Up @@ -165,16 +166,19 @@ func (s *Server) handleAggregated(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(err.Error()))
return
}

durFn := func(total time.Duration) time.Duration {
return segmentDur
}
s.mu.Lock()
if s.ops == nil {
s.mu.Unlock()
w.WriteHeader(404)
return
}
if s.agrr == nil || !s.agrr.HasDuration(segmentDur) {
aggr := aggregate.Aggregate(s.ops, segmentDur, 0)
if s.agrr == nil || s.aggrDur != segmentDur {
aggr := aggregate.Aggregate(s.ops, durFn, 0)
s.agrr = &aggr
s.aggrDur = segmentDur
}
// Copy
aggregated := *s.agrr
Expand Down
9 changes: 7 additions & 2 deletions cli/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,13 @@ func printAnalysis(ctx *cli.Context, o bench.Operations) {
if wantOp := ctx.String("analyze.op"); wantOp != "" {
o = o.FilterByOp(wantOp)
}

aggr := aggregate.Aggregate(o, analysisDur(ctx, o.Duration()), ctx.Duration("analyze.skip"))
durFn := func(total time.Duration) time.Duration {
if total <= 0 {
return 0
}
return analysisDur(ctx, total)
}
aggr := aggregate.Aggregate(o, durFn, ctx.Duration("analyze.skip"))
if wrSegs != nil {
for _, ops := range aggr.Operations {
writeSegs(ctx, wrSegs, o.FilterByOp(ops.Type), aggr.Mixed, details)
Expand Down
70 changes: 39 additions & 31 deletions pkg/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type Aggregated struct {
// MixedServerStats and MixedThroughputByHost is populated only when data is mixed.
MixedServerStats *Throughput `json:"mixed_server_stats,omitempty"`
MixedThroughputByHost map[string]Throughput `json:"mixed_throughput_by_host,omitempty"`
// segmentDur records the duration used for segmenting the data.
segmentDur time.Duration
}

// Operation returns statistics for a single operation type.
Expand Down Expand Up @@ -69,8 +67,11 @@ type Operation struct {
ThroughputByHost map[string]Throughput `json:"throughput_by_host"`
}

// SegmentDurFn accepts a total time and should return the duration used for each segment.
type SegmentDurFn func(total time.Duration) time.Duration

// Aggregate returns statistics when only a single operation was running concurrently.
func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated {
func Aggregate(o bench.Operations, dFn SegmentDurFn, skipDur time.Duration) Aggregated {
o.SortByStartTime()
types := o.OpTypes()
a := Aggregated{
Expand All @@ -79,19 +80,22 @@ func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated
Operations: nil,
MixedServerStats: nil,
MixedThroughputByHost: nil,
segmentDur: segmentDur,
}
isMixed := o.IsMixed()
// Fill mixed only parts...
if isMixed {
a.Mixed = true
a.Type = "mixed"
ops := o.FilterInsideRange(o.ActiveTimeRange(true))
total := ops.Total(false)
o.SortByStartTime()
start, end := o.ActiveTimeRange(true)
start.Add(skipDur)
total := o.FilterInsideRange(start, end).Total(false)
a.MixedServerStats = &Throughput{}
a.MixedServerStats.fill(total)

segmentDur := dFn(total.Duration())
segs := o.Segment(bench.SegmentOptions{
From: time.Time{},
From: start.Add(skipDur),
PerSegDuration: segmentDur,
AllThreads: true,
MultiOp: true,
Expand Down Expand Up @@ -160,7 +164,7 @@ func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated
a.Skipped = true
return
}

segmentDur := dFn(ops.Duration())
segs := ops.Segment(bench.SegmentOptions{
From: time.Time{},
PerSegDuration: segmentDur,
Expand Down Expand Up @@ -190,35 +194,39 @@ func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated

eps := ops.Endpoints()
a.ThroughputByHost = make(map[string]Throughput, len(eps))
var epMu sync.Mutex
var epWg sync.WaitGroup
epWg.Add(len(eps))
for _, ep := range eps {
// Use all ops to include errors.
ops := allOps.FilterByEndpoint(ep)
total := ops.Total(false)
var host Throughput
host.fill(total)

segs := ops.Segment(bench.SegmentOptions{
From: time.Time{},
PerSegDuration: segmentDur,
AllThreads: false,
})

if len(segs) > 1 {
host.Segmented = &ThroughputSegmented{
SegmentDurationMillis: durToMillis(segmentDur),
go func(ep string) {
defer epWg.Done()
// Use all ops to include errors.
ops := allOps.FilterByEndpoint(ep)
total := ops.Total(false)
var host Throughput
host.fill(total)

segs := ops.Segment(bench.SegmentOptions{
From: time.Time{},
PerSegDuration: segmentDur,
AllThreads: false,
})

if len(segs) > 1 {
host.Segmented = &ThroughputSegmented{
SegmentDurationMillis: durToMillis(segmentDur),
}
host.Segmented.fill(segs, total)
}
host.Segmented.fill(segs, total)
}
a.ThroughputByHost[ep] = host
epMu.Lock()
a.ThroughputByHost[ep] = host
epMu.Unlock()
}(ep)
}
epWg.Wait()
}(i)
}
wg.Wait()
a.Operations = res
return a
}

// HasDuration returns whether the aggregation has been created with the specified duration.
func (a Aggregated) HasDuration(d time.Duration) bool {
return a.segmentDur == d
}

0 comments on commit 57d6ddc

Please sign in to comment.