Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream/multiconnection: Default process reporter #1734

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ type Exchange struct {
HTTPTimeout time.Duration `json:"httpTimeout"`
HTTPUserAgent string `json:"httpUserAgent,omitempty"`
HTTPDebugging bool `json:"httpDebugging,omitempty"`
WebsocketMetricsLogging bool `json:"websocketMetricsLogging"`
WebsocketResponseCheckTimeout time.Duration `json:"websocketResponseCheckTimeout"`
WebsocketResponseMaxLimit time.Duration `json:"websocketResponseMaxLimit"`
WebsocketTrafficTimeout time.Duration `json:"websocketTrafficTimeout"`
Expand Down
119 changes: 119 additions & 0 deletions exchanges/stream/reporting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package stream

import (
"sync"
"time"

"github.com/thrasher-corp/gocryptotrader/log"
)

// ProcessReporterManager defines an interface for managing ProcessReporter instances across connections, this will
// create a new ProcessReporter instance for each new connection reader as they spawn.
type ProcessReporterManager interface {
New(conn Connection) ProcessReporter
}

// ProcessReporter defines an interface for reporting processed data from a connection
type ProcessReporter interface {
// Report logs the processing time for a received data packet and updates metrics.
// read is the time the data was read from the connection.
// data is the raw data received from the connection.
// err is any error that occurred while processing the data.
Report(read time.Time, data []byte, err error)
// close closes the process reporter and handles any cleanup.
shazbert marked this conversation as resolved.
Show resolved Hide resolved
Close()
}

// SetProcessReportManager sets the ProcessReporterManager for the Websocket instance which will be used to create new ProcessReporter instances.
// This will track metrics for processing websocket data.
func (w *Websocket) SetProcessReportManager(m ProcessReporterManager) {
w.m.Lock()
defer w.m.Unlock()
w.processReporter = m
}

// NewDefaultProcessReporterManager returns a new defaultProcessReporterManager instance
func NewDefaultProcessReporterManager() ProcessReporterManager {
return defaultProcessReporterManager{period: time.Minute}
}

// defaultProcessReporterManager is a default implementation of ProcessReporter
type defaultProcessReporterManager struct{ period time.Duration }

// New returns a new DefaultProcessReporter instance for a connection
func (d defaultProcessReporterManager) New(conn Connection) ProcessReporter {
reporter := &defaultProcessReporter{ch: make(chan struct{})}
go reporter.collectMetrics(conn, d.period)
return reporter
}

// DefaultProcessReporter provides a thread-safe implementation of the ProcessReporter interface.
// It tracks operation metrics, including the number of operations, average processing time, and peak processing time.
type defaultProcessReporter struct {
operations int64
errors int64
totalProcessingTime time.Duration
peakProcessingTime time.Duration
peakCause []byte
ch chan struct{}
m sync.Mutex
}

// Report logs the processing time for a received data packet and updates metrics.
func (r *defaultProcessReporter) Report(read time.Time, data []byte, err error) {
processingDuration := time.Since(read)
r.m.Lock()
defer r.m.Unlock()
r.operations++
if err != nil {
r.errors++
}
r.totalProcessingTime += processingDuration
if processingDuration > r.peakProcessingTime {
r.peakProcessingTime = processingDuration
r.peakCause = data
}
}

// Close closes the process reporter
func (r *defaultProcessReporter) Close() {
r.m.Lock()
close(r.ch)
r.m.Unlock()
}

// collectMetrics runs in a separate goroutine to periodically log aggregated metrics.
func (r *defaultProcessReporter) collectMetrics(conn Connection, period time.Duration) {
if period == 0 {
panic("period duration for collecting metrics must be greater than 0")
}
timer := time.NewTimer(time.Until(time.Now().Truncate(period).Add(period)))
defer timer.Stop()

for {
select {
case <-r.ch:
return
case <-timer.C:
timer.Reset(time.Until(time.Now().Truncate(period).Add(period)))
r.m.Lock()
if r.operations > 0 {
avgOperationsPerSecond := float64(r.operations) / 60
avgProcessingTime := r.totalProcessingTime / time.Duration(r.operations)
peakTime := r.peakProcessingTime
peakCause := r.peakCause
errors := r.errors
// Reset metrics for the next interval.
r.operations, r.totalProcessingTime, r.peakProcessingTime, r.peakCause, r.errors = 0, 0, 0, nil, 0
r.m.Unlock()
if len(peakCause) > 100 {
peakCause = append(peakCause[:100], []byte("...")...)
}
// Log metrics outside of the critical section to avoid blocking other threads.
log.Debugf(log.WebsocketMgr, "Connection: %v Operations/Second: %.2f, Avg Processing/Operation: %v, Errors: %v Peak: %v Cause: %v...", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, errors, peakTime, string(peakCause))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
peakCause = append(peakCause[:100], []byte("...")...)
}
// Log metrics outside of the critical section to avoid blocking other threads.
log.Debugf(log.WebsocketMgr, "Connection: %v Operations/Second: %.2f, Avg Processing/Operation: %v, Errors: %v Peak: %v Cause: %v...", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, errors, peakTime, string(peakCause))
peakCause = append(peakCause[:100], []byte("...")...)
}
// Log metrics outside of the critical section to avoid blocking other threads.
log.Debugf(log.WebsocketMgr, "Connection: %v Operations/Second: %.2f, Avg Processing/Operation: %v, Errors: %v Peak: %v Cause: %v", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, errors, peakTime, string(peakCause))

I removed the ellipses in the main output because you add them if its > 100 and doesn't really mean much if < 100

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} else {
r.m.Unlock()
}
}
}
}
44 changes: 44 additions & 0 deletions exchanges/stream/reporting_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package stream

import (
"context"
"testing"
"time"
)

type DummyConnection struct {
Connection
ch chan []byte
}

func (d *DummyConnection) ReadMessage() Response {
return Response{Raw: <-d.ch}
}

func (d *DummyConnection) Push(data []byte) {
d.ch <- data
}

func (d *DummyConnection) GetURL() string {
return "ws://test"
}

func ProcessWithSomeSweetLag(context.Context, []byte) error {
time.Sleep(time.Millisecond)
return nil
}

func TestDefaultProcessReporter(t *testing.T) {
t.Parallel()
w := &Websocket{}
reporterManager := defaultProcessReporterManager{period: time.Millisecond * 10}
w.SetProcessReportManager(&reporterManager)
conn := &DummyConnection{ch: make(chan []byte)}
w.Wg.Add(1)
go w.Reader(context.Background(), conn, ProcessWithSomeSweetLag)

for range 100 {
conn.Push([]byte("test"))
}
conn.Push(nil)
}
29 changes: 28 additions & 1 deletion exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
w.setState(disconnectedState)

w.rateLimitDefinitions = s.RateLimitDefinitions

if s.ExchangeConfig.WebsocketMetricsLogging {
if s.UseMultiConnectionManagement {
w.processReporter = NewDefaultProcessReporterManager()
} else {
log.Warnf(log.WebsocketMgr, "%s websocket: metrics logging is only supported with multi connection management supported exchanges", w.exchangeName)
}
}
return nil
}

Expand Down Expand Up @@ -1075,14 +1083,33 @@ func (w *Websocket) checkSubscriptions(conn Connection, subs subscription.List)
// Reader reads and handles data from a specific connection
func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ctx context.Context, message []byte) error) {
defer w.Wg.Done()
var reporter ProcessReporter
if w.processReporter != nil {
reporter = w.processReporter.New(conn)
}
for {
resp := conn.ReadMessage()

var readAt time.Time
if reporter != nil {
readAt = time.Now()
}

if resp.Raw == nil {
if reporter != nil {
reporter.Close()
}
return // Connection has been closed
}
if err := handler(ctx, resp.Raw); err != nil {

err := handler(ctx, resp.Raw)
if err != nil {
w.DataHandler <- fmt.Errorf("connection URL:[%v] error: %w", conn.GetURL(), err)
}

if reporter != nil {
reporter.Report(readAt, resp.Raw, err)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type testSubKey struct {

var defaultSetup = &WebsocketSetup{
ExchangeConfig: &config.Exchange{
WebsocketMetricsLogging: true,
Features: &config.FeaturesConfig{
Enabled: config.FeaturesEnabledConfig{Websocket: true},
},
Expand Down
1 change: 1 addition & 0 deletions exchanges/stream/websocket_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type Websocket struct {
// rateLimitDefinitions contains the rate limiters shared between Websocket and REST connections for all potential
// endpoints.
rateLimitDefinitions request.RateLimitDefinitions
processReporter ProcessReporterManager
}

// WebsocketSetup defines variables for setting up a websocket connection
Expand Down
Loading