diff --git a/api/api.go b/api/api.go index f8762d54..5ca20006 100644 --- a/api/api.go +++ b/api/api.go @@ -59,6 +59,7 @@ type Server struct { status BenchmarkStatus ops bench.Operations agrr *aggregate.Aggregated + aggrDur time.Duration server *http.Server cmdLine string @@ -165,16 +166,19 @@ func (s *Server) handleAggregated(w http.ResponseWriter, req *http.Request) { w.Write([]byte(err.Error())) return } - + durFn := func(total time.Duration) time.Duration { + return segmentDur + } s.mu.Lock() if s.ops == nil { s.mu.Unlock() w.WriteHeader(404) return } - if s.agrr == nil || !s.agrr.HasDuration(segmentDur) { - aggr := aggregate.Aggregate(s.ops, segmentDur, 0) + if s.agrr == nil || s.aggrDur != segmentDur { + aggr := aggregate.Aggregate(s.ops, durFn, 0) s.agrr = &aggr + s.aggrDur = segmentDur } // Copy aggregated := *s.agrr diff --git a/cli/analyze.go b/cli/analyze.go index 64f16dc2..dc83157d 100644 --- a/cli/analyze.go +++ b/cli/analyze.go @@ -235,8 +235,13 @@ func printAnalysis(ctx *cli.Context, o bench.Operations) { if wantOp := ctx.String("analyze.op"); wantOp != "" { o = o.FilterByOp(wantOp) } - - aggr := aggregate.Aggregate(o, analysisDur(ctx, o.Duration()), ctx.Duration("analyze.skip")) + durFn := func(total time.Duration) time.Duration { + if total <= 0 { + return 0 + } + return analysisDur(ctx, total) + } + aggr := aggregate.Aggregate(o, durFn, ctx.Duration("analyze.skip")) if wrSegs != nil { for _, ops := range aggr.Operations { writeSegs(ctx, wrSegs, o.FilterByOp(ops.Type), aggr.Mixed, details) diff --git a/pkg/aggregate/aggregate.go b/pkg/aggregate/aggregate.go index 637cb897..c98a641d 100644 --- a/pkg/aggregate/aggregate.go +++ b/pkg/aggregate/aggregate.go @@ -33,8 +33,6 @@ type Aggregated struct { // MixedServerStats and MixedThroughputByHost is populated only when data is mixed. MixedServerStats *Throughput `json:"mixed_server_stats,omitempty"` MixedThroughputByHost map[string]Throughput `json:"mixed_throughput_by_host,omitempty"` - // segmentDur records the duration used for segmenting the data. - segmentDur time.Duration } // Operation returns statistics for a single operation type. @@ -69,8 +67,11 @@ type Operation struct { ThroughputByHost map[string]Throughput `json:"throughput_by_host"` } +// SegmentDurFn accepts a total time and should return the duration used for each segment. +type SegmentDurFn func(total time.Duration) time.Duration + // Aggregate returns statistics when only a single operation was running concurrently. -func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated { +func Aggregate(o bench.Operations, dFn SegmentDurFn, skipDur time.Duration) Aggregated { o.SortByStartTime() types := o.OpTypes() a := Aggregated{ @@ -79,19 +80,22 @@ func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated Operations: nil, MixedServerStats: nil, MixedThroughputByHost: nil, - segmentDur: segmentDur, } isMixed := o.IsMixed() // Fill mixed only parts... if isMixed { a.Mixed = true a.Type = "mixed" - ops := o.FilterInsideRange(o.ActiveTimeRange(true)) - total := ops.Total(false) + o.SortByStartTime() + start, end := o.ActiveTimeRange(true) + start.Add(skipDur) + total := o.FilterInsideRange(start, end).Total(false) a.MixedServerStats = &Throughput{} a.MixedServerStats.fill(total) + + segmentDur := dFn(total.Duration()) segs := o.Segment(bench.SegmentOptions{ - From: time.Time{}, + From: start.Add(skipDur), PerSegDuration: segmentDur, AllThreads: true, MultiOp: true, @@ -160,7 +164,7 @@ func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated a.Skipped = true return } - + segmentDur := dFn(ops.Duration()) segs := ops.Segment(bench.SegmentOptions{ From: time.Time{}, PerSegDuration: segmentDur, @@ -190,35 +194,39 @@ func Aggregate(o bench.Operations, segmentDur, skipDur time.Duration) Aggregated eps := ops.Endpoints() a.ThroughputByHost = make(map[string]Throughput, len(eps)) + var epMu sync.Mutex + var epWg sync.WaitGroup + epWg.Add(len(eps)) for _, ep := range eps { - // Use all ops to include errors. - ops := allOps.FilterByEndpoint(ep) - total := ops.Total(false) - var host Throughput - host.fill(total) - - segs := ops.Segment(bench.SegmentOptions{ - From: time.Time{}, - PerSegDuration: segmentDur, - AllThreads: false, - }) - - if len(segs) > 1 { - host.Segmented = &ThroughputSegmented{ - SegmentDurationMillis: durToMillis(segmentDur), + go func(ep string) { + defer epWg.Done() + // Use all ops to include errors. + ops := allOps.FilterByEndpoint(ep) + total := ops.Total(false) + var host Throughput + host.fill(total) + + segs := ops.Segment(bench.SegmentOptions{ + From: time.Time{}, + PerSegDuration: segmentDur, + AllThreads: false, + }) + + if len(segs) > 1 { + host.Segmented = &ThroughputSegmented{ + SegmentDurationMillis: durToMillis(segmentDur), + } + host.Segmented.fill(segs, total) } - host.Segmented.fill(segs, total) - } - a.ThroughputByHost[ep] = host + epMu.Lock() + a.ThroughputByHost[ep] = host + epMu.Unlock() + }(ep) } + epWg.Wait() }(i) } wg.Wait() a.Operations = res return a } - -// HasDuration returns whether the aggregation has been created with the specified duration. -func (a Aggregated) HasDuration(d time.Duration) bool { - return a.segmentDur == d -}