Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapt to protocol changes #14

Merged
merged 4 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func RootCmd() *cobra.Command {
Long: rootHelp,
Version: version,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
endpoint, err := url.JoinPath(baseURL, "/events")
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func runCmd() *cobra.Command {
Short: "k6 test runner and terminal-based metrics dashboard viewer",
Long: runHelp,
Version: version,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, args []string) error {
return runRun(args)
},

Expand Down
1 change: 1 addition & 0 deletions internal/digest/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
EventTypeCumulative // EventTypeCumulative mean "cumulative" SSE event.
EventTypeStart // EventTypeStart mean "start" SSE event.
EventTypeStop // EventTypeStop mean "stop" SSE event.
EventTypeThreshold // EventTypeThreshold mean "threshold" SSE event.
EventTypeConnect // EventTypeConnect mean SSE channel connected.
EventTypeDisconnect // EventTypeDisconnect mean SSE channel disconnected.
)
Expand Down
28 changes: 16 additions & 12 deletions internal/digest/eventtype_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

195 changes: 168 additions & 27 deletions internal/stream/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@ package stream

import (
"encoding/json"
"errors"
"fmt"
"sort"

"github.com/r3labs/sse/v2"
"github.com/szkiba/xk6-top/internal/digest"
)

func parse(msg *sse.Event) (*digest.Event, error) {
type parser struct {
metrics digest.Metrics
names []string
}

func newParser() *parser {
return &parser{metrics: make(digest.Metrics)}
}

func (p *parser) parse(msg *sse.Event) (*digest.Event, error) {
var (
etype digest.EventType
edata interface{}
Expand All @@ -18,56 +30,185 @@ func parse(msg *sse.Event) (*digest.Event, error) {
return nil, err
}

edata, err = unmarshalData(etype, msg.Data)
edata, err = p.unmarshalData(etype, msg.Data)
if err != nil {
return nil, err
}

return &digest.Event{Type: etype, Data: edata}, nil
}

func unmarshalData(etype digest.EventType, data []byte) (interface{}, error) {
func (p *parser) unmarshalData(etype digest.EventType, data []byte) (interface{}, error) {
switch etype {
case digest.EventTypeMetric:
target := make(digest.Metrics)
return p.parseMetric(data)

if err := json.Unmarshal(data, &target); err != nil {
return nil, err
case digest.EventTypeParam:
return p.parseParam(data)

case digest.EventTypeConfig:
return p.parseConfig(data)

case digest.EventTypeStart,
digest.EventTypeStop,
digest.EventTypeSnapshot,
digest.EventTypeCumulative:
if len(data) > 0 && data[0] == '{' {
return p.parseAggregatesLegacy(data)
}

return target, nil
return p.parseAggregates(data)

case digest.EventTypeParam:
target := new(digest.ParamData)
default:
return nil, nil //nolint:nilnil
}
}

if err := json.Unmarshal(data, target); err != nil {
return nil, err
}
func (p *parser) parseMetric(data []byte) (interface{}, error) {
target := make(digest.Metrics)

return target, nil
if err := json.Unmarshal(data, &target); err != nil {
return nil, err
}

case digest.EventTypeConfig:
target := make(digest.ConfigData)
for k, v := range target {
v.Name = k

if err := json.Unmarshal(data, &target); err != nil {
return nil, err
}
p.metrics[k] = v
}

return target, nil
names := make([]string, 0, len(p.metrics))

case digest.EventTypeStart,
digest.EventTypeStop,
digest.EventTypeSnapshot,
digest.EventTypeCumulative:
target := make(digest.Aggregates)
for name := range p.metrics {
names = append(names, name)
}

sort.Strings(names)

p.names = names

if err := json.Unmarshal(data, &target); err != nil {
return target, nil
}

func (p *parser) parseParam(data []byte) (interface{}, error) {
target := new(digest.ParamData)

if err := json.Unmarshal(data, target); err != nil {
return nil, err
}

return target, nil
}

func (p *parser) parseConfig(data []byte) (interface{}, error) {
target := make(digest.ConfigData)

if err := json.Unmarshal(data, &target); err != nil {
return nil, err
}

return target, nil
}

func (p *parser) parseAggregatesLegacy(data []byte) (interface{}, error) {
target := make(digest.Aggregates)

if err := json.Unmarshal(data, &target); err != nil {
return nil, err
}

return target, nil
}

func (p *parser) parseAggregates(data []byte) (interface{}, error) {
var samples [][]float64

if err := json.Unmarshal(data, &samples); err != nil {
return nil, err
}

target := make(digest.Aggregates)

for metricIdx := range samples {
metric, err := p.getMetric(metricIdx)
if err != nil {
return nil, err
}

agg, err := p.parseAggregate(samples[metricIdx], metric.Type)
if err != nil {
return nil, err
}

return target, nil
target[metric.Name] = agg
}

return target, nil
}

func (p *parser) getMetric(idx int) (*digest.Metric, error) {
if idx >= len(p.names) {
return nil, fmt.Errorf("%w: metric index out of range %d", errData, idx)
}

name := p.names[idx]

metric, found := p.metrics[name]
if !found {
return nil, fmt.Errorf("%w: unknown metric name %s", errData, name)
}

return metric, nil
}

func (p *parser) parseAggregate(data []float64, mt digest.MetricType) (digest.Aggregate, error) {
names := aggregateNames(mt)
if len(names) == 0 {
return nil, fmt.Errorf("%w: no metric names for type %s", errData, mt.String())
}

if len(data) != len(names) {
return nil, fmt.Errorf(
"%w: metric definition mismatch %d - %d - %s",
errData,
len(data),
len(names),
names,
)

// return nil, fmt.Errorf("%w: metric definition mismatch %s", errData, mt.String())
}

agg := make(digest.Aggregate, len(names))

for idx := range names {
agg[names[idx]] = data[idx]
}

return agg, nil
}

func aggregateNames(mtype digest.MetricType) []string {
switch mtype {
case digest.MetricTypeGauge:
return gaugeAggregateNames
case digest.MetricTypeRate:
return rateAggregateNames
case digest.MetricTypeCounter:
return counterAggregateNames
case digest.MetricTypeTrend:
return trendAggregateNames
default:
return nil, nil //nolint:nilnil
return nil
}
}

//nolint:gochecknoglobals
var (
gaugeAggregateNames = []string{"value"}
rateAggregateNames = []string{"rate"}
counterAggregateNames = []string{"count", "rate"}
trendAggregateNames = []string{"avg", "max", "med", "min", "p(90)", "p(95)", "p(99)"}
)

var errData = errors.New("invalid data")
4 changes: 3 additions & 1 deletion internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ func Subscribe(ctx context.Context, url string, sub chan tea.Msg) tea.Cmd {
sub <- &digest.Event{Type: digest.EventTypeDisconnect}
})

parser := newParser()

return client.SubscribeRawWithContext(ctx, func(msg *sse.Event) {
event, perr := parse(msg)
event, perr := parser.parse(msg)
if perr != nil {
sub <- perr

Expand Down
6 changes: 6 additions & 0 deletions releases/v0.2.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
xk6-top `v0.2.1` is here 🎉!

`v0.2.0` is a maintenance release, it does not contain new features.
The purpose of the release is to adapt to protocol changes.

From xk6-dashboard v0.7.3, the SSE protocol aggregate events have been optimized. This is an incompatible change that is addressed in this release. Older xk6-top versions do not work with newer xk6-dashboard (and k6) versions.
Loading