From 37c75abaf9ad2beba0f1e48fc233e2ff23100a99 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 26 Nov 2024 15:46:15 +1100 Subject: [PATCH 1/9] stream/gateio: Default process reporter (cherry-pickable to GCT library) --- exchanges/gateio/gateio_wrapper.go | 1 + exchanges/stream/reporting.go | 99 +++++++++++++++++++++++++++++ exchanges/stream/websocket.go | 12 ++++ exchanges/stream/websocket_types.go | 1 + 4 files changed, 113 insertions(+) create mode 100644 exchanges/stream/reporting.go diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index 02d2b760fa6..077b97a3103 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -180,6 +180,7 @@ func (g *Gateio) SetDefaults() { g.WebsocketResponseMaxLimit = exchange.DefaultWebsocketResponseMaxLimit g.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout g.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit + g.Websocket.SetProcessReportManager(stream.DefaultProcessReporterManager{}) } // Setup sets user configuration diff --git a/exchanges/stream/reporting.go b/exchanges/stream/reporting.go new file mode 100644 index 00000000000..51e174c3f66 --- /dev/null +++ b/exchanges/stream/reporting.go @@ -0,0 +1,99 @@ +package stream + +import ( + "sync" + "time" + + "github.com/thrasher-corp/gocryptotrader/log" +) + +// ProcessReporterManager defines an interface for managing ProcessReporter instances across connections, this will +// create a new ProcessReporter instance for each new connection reader. +type ProcessReporterManager interface { + New() ProcessReporter +} + +// DefaultProcessReporter is a default implementation of ProcessReporter +type DefaultProcessReporterManager struct{} + +// New returns a new DefaultProcessReporter instance for a connection +func (d DefaultProcessReporterManager) New() ProcessReporter { return &DefaultProcessReporter{} } + +// ProcessReporter defines an interface for reporting processed data from a connection +type ProcessReporter interface { + Report(conn Connection, read time.Time, data []byte) +} + +// DefaultProcessReporter provides a thread-safe implementation of the ProcessReporter interface. +// It tracks operation metrics, including the number of operations, average processing time, and peak processing time. +type DefaultProcessReporter struct { + operations int64 + totalProcessingTime time.Duration + peakProcessingTime time.Duration + ch chan struct{} + m sync.Mutex +} + +// Report logs the processing time for a received data packet and updates metrics. +// If `data` is nil, the reporter shuts down its metrics collection routine. +func (r *DefaultProcessReporter) Report(conn Connection, read time.Time, data []byte) { + processingDuration := time.Since(read) + + r.m.Lock() + defer r.m.Unlock() + if data == nil { + if r.ch != nil { + close(r.ch) + } + return + } + + if r.ch == nil { + r.ch = make(chan struct{}) + go r.collectMetrics(conn) + } + + r.operations++ + r.totalProcessingTime += processingDuration + if processingDuration > r.peakProcessingTime { + r.peakProcessingTime = processingDuration + } +} + +// collectMetrics runs in a separate goroutine to periodically log aggregated metrics. +func (r *DefaultProcessReporter) collectMetrics(conn Connection) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-r.ch: + return + case <-ticker.C: + r.m.Lock() + if r.operations > 0 { + avgOperationsPerSecond := r.operations / 60 + avgProcessingTime := r.totalProcessingTime / time.Duration(r.operations) + peakTime := r.peakProcessingTime + + // Reset metrics for the next interval. + r.operations, r.totalProcessingTime, r.peakProcessingTime = 0, 0, 0 + + r.m.Unlock() + + // Log metrics outside of the critical section to avoid blocking other threads. + log.Debugf(log.WebsocketMgr, "%v: Operations/Second: %d, Avg Processing/Operation: %v, Peak: %v", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, peakTime) + } else { + r.m.Unlock() + } + } + } +} + +// SetProcessReportManager sets the ProcessReporterManager for the Websocket instance which will be used to create new ProcessReporter instances. +// This will track metrics for processing websocket data. +func (w *Websocket) SetProcessReportManager(m ProcessReporterManager) { + w.m.Lock() + defer w.m.Unlock() + w.processReporter = m +} diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 309db9a79d4..f194b4beec7 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -1075,14 +1075,26 @@ func (w *Websocket) checkSubscriptions(conn Connection, subs subscription.List) // Reader reads and handles data from a specific connection func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ctx context.Context, message []byte) error) { defer w.Wg.Done() + var reporter ProcessReporter + if w.processReporter != nil { + reporter = w.processReporter.New() + } for { resp := conn.ReadMessage() + readAt := time.Now() if resp.Raw == nil { + if reporter != nil { + reporter.Report(conn, readAt, nil) + } return // Connection has been closed } if err := handler(ctx, resp.Raw); err != nil { w.DataHandler <- fmt.Errorf("connection URL:[%v] error: %w", conn.GetURL(), err) } + + if reporter != nil { + reporter.Report(conn, readAt, resp.Raw) + } } } diff --git a/exchanges/stream/websocket_types.go b/exchanges/stream/websocket_types.go index 27a5c81963f..75872167f2b 100644 --- a/exchanges/stream/websocket_types.go +++ b/exchanges/stream/websocket_types.go @@ -109,6 +109,7 @@ type Websocket struct { // rateLimitDefinitions contains the rate limiters shared between Websocket and REST connections for all potential // endpoints. rateLimitDefinitions request.RateLimitDefinitions + processReporter ProcessReporterManager } // WebsocketSetup defines variables for setting up a websocket connection From 73304c1974e4d1301631a81cef9cbc58eb1afa75 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Thu, 28 Nov 2024 13:24:42 +1100 Subject: [PATCH 2/9] fixes --- engine/websocketroutine_manager.go | 4 ++ exchanges/gateio/gateio_wrapper.go | 1 - exchanges/stream/reporting.go | 63 ++++++++++++++++-------------- exchanges/stream/websocket.go | 9 +++-- 4 files changed, 43 insertions(+), 34 deletions(-) diff --git a/engine/websocketroutine_manager.go b/engine/websocketroutine_manager.go index f6412d8bd0f..24e5a34b2e4 100644 --- a/engine/websocketroutine_manager.go +++ b/engine/websocketroutine_manager.go @@ -134,6 +134,10 @@ func (m *WebsocketRoutineManager) websocketRoutine() { continue } + if m.verbose { + ws.SetProcessReportManager(&stream.DefaultProcessReporterManager{}) + } + wg.Add(1) go func() { defer wg.Done() diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index 077b97a3103..02d2b760fa6 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -180,7 +180,6 @@ func (g *Gateio) SetDefaults() { g.WebsocketResponseMaxLimit = exchange.DefaultWebsocketResponseMaxLimit g.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout g.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit - g.Websocket.SetProcessReportManager(stream.DefaultProcessReporterManager{}) } // Setup sets user configuration diff --git a/exchanges/stream/reporting.go b/exchanges/stream/reporting.go index 51e174c3f66..c1bb56c97ea 100644 --- a/exchanges/stream/reporting.go +++ b/exchanges/stream/reporting.go @@ -10,18 +10,36 @@ import ( // ProcessReporterManager defines an interface for managing ProcessReporter instances across connections, this will // create a new ProcessReporter instance for each new connection reader. type ProcessReporterManager interface { - New() ProcessReporter + New(conn Connection) ProcessReporter +} + +// ProcessReporter defines an interface for reporting processed data from a connection +type ProcessReporter interface { + // Report logs the processing time for a received data packet and updates metrics. + // read is the time the data was read from the connection. + // data is the raw data received from the connection. + // err is any error that occurred while processing the data. + Report(read time.Time, data []byte, err error) + // close closes the process reporter and handles any cleanup. + Close() +} + +// SetProcessReportManager sets the ProcessReporterManager for the Websocket instance which will be used to create new ProcessReporter instances. +// This will track metrics for processing websocket data. +func (w *Websocket) SetProcessReportManager(m ProcessReporterManager) { + w.m.Lock() + defer w.m.Unlock() + w.processReporter = m } // DefaultProcessReporter is a default implementation of ProcessReporter type DefaultProcessReporterManager struct{} // New returns a new DefaultProcessReporter instance for a connection -func (d DefaultProcessReporterManager) New() ProcessReporter { return &DefaultProcessReporter{} } - -// ProcessReporter defines an interface for reporting processed data from a connection -type ProcessReporter interface { - Report(conn Connection, read time.Time, data []byte) +func (d DefaultProcessReporterManager) New(conn Connection) ProcessReporter { + reporter := &DefaultProcessReporter{ch: make(chan struct{})} + go reporter.collectMetrics(conn) + return reporter } // DefaultProcessReporter provides a thread-safe implementation of the ProcessReporter interface. @@ -35,24 +53,10 @@ type DefaultProcessReporter struct { } // Report logs the processing time for a received data packet and updates metrics. -// If `data` is nil, the reporter shuts down its metrics collection routine. -func (r *DefaultProcessReporter) Report(conn Connection, read time.Time, data []byte) { +func (r *DefaultProcessReporter) Report(read time.Time, _ []byte, _ error) { processingDuration := time.Since(read) - r.m.Lock() defer r.m.Unlock() - if data == nil { - if r.ch != nil { - close(r.ch) - } - return - } - - if r.ch == nil { - r.ch = make(chan struct{}) - go r.collectMetrics(conn) - } - r.operations++ r.totalProcessingTime += processingDuration if processingDuration > r.peakProcessingTime { @@ -60,6 +64,15 @@ func (r *DefaultProcessReporter) Report(conn Connection, read time.Time, data [] } } +// Close closes the process reporter +func (r *DefaultProcessReporter) Close() { + r.m.Lock() + defer r.m.Unlock() + if r.ch != nil { + close(r.ch) + } +} + // collectMetrics runs in a separate goroutine to periodically log aggregated metrics. func (r *DefaultProcessReporter) collectMetrics(conn Connection) { ticker := time.NewTicker(time.Minute) @@ -89,11 +102,3 @@ func (r *DefaultProcessReporter) collectMetrics(conn Connection) { } } } - -// SetProcessReportManager sets the ProcessReporterManager for the Websocket instance which will be used to create new ProcessReporter instances. -// This will track metrics for processing websocket data. -func (w *Websocket) SetProcessReportManager(m ProcessReporterManager) { - w.m.Lock() - defer w.m.Unlock() - w.processReporter = m -} diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index f194b4beec7..3b81d153032 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -1077,23 +1077,24 @@ func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ct defer w.Wg.Done() var reporter ProcessReporter if w.processReporter != nil { - reporter = w.processReporter.New() + reporter = w.processReporter.New(conn) } for { resp := conn.ReadMessage() readAt := time.Now() if resp.Raw == nil { if reporter != nil { - reporter.Report(conn, readAt, nil) + reporter.Close() } return // Connection has been closed } - if err := handler(ctx, resp.Raw); err != nil { + err := handler(ctx, resp.Raw) + if err != nil { w.DataHandler <- fmt.Errorf("connection URL:[%v] error: %w", conn.GetURL(), err) } if reporter != nil { - reporter.Report(conn, readAt, resp.Raw) + reporter.Report(readAt, resp.Raw, err) } } } From 09ec12f5a74102bf064cbda725d6e0e79cbc65c6 Mon Sep 17 00:00:00 2001 From: shazbert Date: Thu, 5 Dec 2024 17:43:40 +1100 Subject: [PATCH 3/9] glorious: nits --- config/config_types.go | 1 + engine/websocketroutine_manager.go | 4 ---- exchanges/stream/reporting.go | 35 ++++++++++++++++++------------ exchanges/stream/websocket.go | 12 +++++++++- 4 files changed, 33 insertions(+), 19 deletions(-) diff --git a/config/config_types.go b/config/config_types.go index bc52fb6e156..29a7fee0806 100644 --- a/config/config_types.go +++ b/config/config_types.go @@ -184,6 +184,7 @@ type Exchange struct { HTTPTimeout time.Duration `json:"httpTimeout"` HTTPUserAgent string `json:"httpUserAgent,omitempty"` HTTPDebugging bool `json:"httpDebugging,omitempty"` + WebsocketMetricsLogging bool `json:"websocketMetricsLogging"` WebsocketResponseCheckTimeout time.Duration `json:"websocketResponseCheckTimeout"` WebsocketResponseMaxLimit time.Duration `json:"websocketResponseMaxLimit"` WebsocketTrafficTimeout time.Duration `json:"websocketTrafficTimeout"` diff --git a/engine/websocketroutine_manager.go b/engine/websocketroutine_manager.go index 24e5a34b2e4..f6412d8bd0f 100644 --- a/engine/websocketroutine_manager.go +++ b/engine/websocketroutine_manager.go @@ -134,10 +134,6 @@ func (m *WebsocketRoutineManager) websocketRoutine() { continue } - if m.verbose { - ws.SetProcessReportManager(&stream.DefaultProcessReporterManager{}) - } - wg.Add(1) go func() { defer wg.Done() diff --git a/exchanges/stream/reporting.go b/exchanges/stream/reporting.go index c1bb56c97ea..c412dab9b78 100644 --- a/exchanges/stream/reporting.go +++ b/exchanges/stream/reporting.go @@ -46,56 +46,63 @@ func (d DefaultProcessReporterManager) New(conn Connection) ProcessReporter { // It tracks operation metrics, including the number of operations, average processing time, and peak processing time. type DefaultProcessReporter struct { operations int64 + errors int64 totalProcessingTime time.Duration peakProcessingTime time.Duration + peakCause []byte ch chan struct{} m sync.Mutex } // Report logs the processing time for a received data packet and updates metrics. -func (r *DefaultProcessReporter) Report(read time.Time, _ []byte, _ error) { +func (r *DefaultProcessReporter) Report(read time.Time, data []byte, err error) { processingDuration := time.Since(read) r.m.Lock() defer r.m.Unlock() r.operations++ + if err != nil { + r.errors++ + } r.totalProcessingTime += processingDuration if processingDuration > r.peakProcessingTime { r.peakProcessingTime = processingDuration + r.peakCause = data } } // Close closes the process reporter func (r *DefaultProcessReporter) Close() { r.m.Lock() - defer r.m.Unlock() - if r.ch != nil { - close(r.ch) - } + close(r.ch) + r.m.Unlock() } // collectMetrics runs in a separate goroutine to periodically log aggregated metrics. func (r *DefaultProcessReporter) collectMetrics(conn Connection) { - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() + timer := time.NewTimer(time.Until(time.Now().Truncate(time.Minute).Add(time.Minute))) + defer timer.Stop() for { select { case <-r.ch: return - case <-ticker.C: + case <-timer.C: + timer.Reset(time.Until(time.Now().Truncate(time.Minute).Add(time.Minute))) r.m.Lock() if r.operations > 0 { - avgOperationsPerSecond := r.operations / 60 + avgOperationsPerSecond := float64(r.operations) / 60 avgProcessingTime := r.totalProcessingTime / time.Duration(r.operations) peakTime := r.peakProcessingTime - + peakCause := r.peakCause + errors := r.errors // Reset metrics for the next interval. - r.operations, r.totalProcessingTime, r.peakProcessingTime = 0, 0, 0 - + r.operations, r.totalProcessingTime, r.peakProcessingTime, r.peakCause, r.errors = 0, 0, 0, nil, 0 r.m.Unlock() - + if len(peakCause) > 100 { + peakCause = append(peakCause[:100], []byte("...")...) + } // Log metrics outside of the critical section to avoid blocking other threads. - log.Debugf(log.WebsocketMgr, "%v: Operations/Second: %d, Avg Processing/Operation: %v, Peak: %v", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, peakTime) + log.Debugf(log.WebsocketMgr, "Connection: %v Operations/Second: %.2f, Avg Processing/Operation: %v, Errors: %v Peak: %v Cause: %v...", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, errors, peakTime, string(peakCause)) } else { r.m.Unlock() } diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 3b81d153032..1165a635dfb 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -203,6 +203,10 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { w.setState(disconnectedState) w.rateLimitDefinitions = s.RateLimitDefinitions + + if s.ExchangeConfig.WebsocketMetricsLogging { + w.processReporter = &DefaultProcessReporterManager{} + } return nil } @@ -1081,13 +1085,19 @@ func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ct } for { resp := conn.ReadMessage() - readAt := time.Now() + + var readAt time.Time + if reporter != nil { + readAt = time.Now() + } + if resp.Raw == nil { if reporter != nil { reporter.Close() } return // Connection has been closed } + err := handler(ctx, resp.Raw) if err != nil { w.DataHandler <- fmt.Errorf("connection URL:[%v] error: %w", conn.GetURL(), err) From 0d5d8ba1fcd9b0856b92bb41f475ac1da2d1fe58 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Fri, 6 Dec 2024 11:41:54 +1100 Subject: [PATCH 4/9] stream/reporter: Add tests to default reporter --- exchanges/stream/reporting.go | 32 ++++++++++++++-------- exchanges/stream/reporting_test.go | 44 ++++++++++++++++++++++++++++++ exchanges/stream/websocket.go | 2 +- exchanges/stream/websocket_test.go | 1 + 4 files changed, 66 insertions(+), 13 deletions(-) create mode 100644 exchanges/stream/reporting_test.go diff --git a/exchanges/stream/reporting.go b/exchanges/stream/reporting.go index c412dab9b78..96916355bce 100644 --- a/exchanges/stream/reporting.go +++ b/exchanges/stream/reporting.go @@ -8,7 +8,7 @@ import ( ) // ProcessReporterManager defines an interface for managing ProcessReporter instances across connections, this will -// create a new ProcessReporter instance for each new connection reader. +// create a new ProcessReporter instance for each new connection reader as they spawn. type ProcessReporterManager interface { New(conn Connection) ProcessReporter } @@ -32,19 +32,24 @@ func (w *Websocket) SetProcessReportManager(m ProcessReporterManager) { w.processReporter = m } -// DefaultProcessReporter is a default implementation of ProcessReporter -type DefaultProcessReporterManager struct{} +// NewDefaultProcessReporterManager returns a new defaultProcessReporterManager instance +func NewDefaultProcessReporterManager() ProcessReporterManager { + return defaultProcessReporterManager{period: time.Minute} +} + +// defaultProcessReporterManager is a default implementation of ProcessReporter +type defaultProcessReporterManager struct{ period time.Duration } // New returns a new DefaultProcessReporter instance for a connection -func (d DefaultProcessReporterManager) New(conn Connection) ProcessReporter { - reporter := &DefaultProcessReporter{ch: make(chan struct{})} - go reporter.collectMetrics(conn) +func (d defaultProcessReporterManager) New(conn Connection) ProcessReporter { + reporter := &defaultProcessReporter{ch: make(chan struct{})} + go reporter.collectMetrics(conn, d.period) return reporter } // DefaultProcessReporter provides a thread-safe implementation of the ProcessReporter interface. // It tracks operation metrics, including the number of operations, average processing time, and peak processing time. -type DefaultProcessReporter struct { +type defaultProcessReporter struct { operations int64 errors int64 totalProcessingTime time.Duration @@ -55,7 +60,7 @@ type DefaultProcessReporter struct { } // Report logs the processing time for a received data packet and updates metrics. -func (r *DefaultProcessReporter) Report(read time.Time, data []byte, err error) { +func (r *defaultProcessReporter) Report(read time.Time, data []byte, err error) { processingDuration := time.Since(read) r.m.Lock() defer r.m.Unlock() @@ -71,15 +76,18 @@ func (r *DefaultProcessReporter) Report(read time.Time, data []byte, err error) } // Close closes the process reporter -func (r *DefaultProcessReporter) Close() { +func (r *defaultProcessReporter) Close() { r.m.Lock() close(r.ch) r.m.Unlock() } // collectMetrics runs in a separate goroutine to periodically log aggregated metrics. -func (r *DefaultProcessReporter) collectMetrics(conn Connection) { - timer := time.NewTimer(time.Until(time.Now().Truncate(time.Minute).Add(time.Minute))) +func (r *defaultProcessReporter) collectMetrics(conn Connection, period time.Duration) { + if period == 0 { + panic("period duration for collecting metrics must be greater than 0") + } + timer := time.NewTimer(time.Until(time.Now().Truncate(period).Add(period))) defer timer.Stop() for { @@ -87,7 +95,7 @@ func (r *DefaultProcessReporter) collectMetrics(conn Connection) { case <-r.ch: return case <-timer.C: - timer.Reset(time.Until(time.Now().Truncate(time.Minute).Add(time.Minute))) + timer.Reset(time.Until(time.Now().Truncate(period).Add(period))) r.m.Lock() if r.operations > 0 { avgOperationsPerSecond := float64(r.operations) / 60 diff --git a/exchanges/stream/reporting_test.go b/exchanges/stream/reporting_test.go new file mode 100644 index 00000000000..edd1fc1157b --- /dev/null +++ b/exchanges/stream/reporting_test.go @@ -0,0 +1,44 @@ +package stream + +import ( + "context" + "testing" + "time" +) + +type DummyConnection struct { + Connection + ch chan []byte +} + +func (d *DummyConnection) ReadMessage() Response { + return Response{Raw: []byte(<-d.ch)} +} + +func (d *DummyConnection) Push(data []byte) { + d.ch <- data +} + +func (d *DummyConnection) GetURL() string { + return "ws://test" +} + +func ProcessWithSomeSweetLag(context.Context, []byte) error { + time.Sleep(time.Millisecond) + return nil +} + +func TestDefaultProcessReporter(t *testing.T) { + t.Parallel() + w := &Websocket{} + reporterManager := defaultProcessReporterManager{period: time.Millisecond * 10} + w.SetProcessReportManager(&reporterManager) + conn := &DummyConnection{ch: make(chan []byte)} + w.Wg.Add(1) + go w.Reader(context.Background(), conn, ProcessWithSomeSweetLag) + + for range 100 { + conn.Push([]byte("test")) + } + conn.Push(nil) +} diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 1165a635dfb..d8f31e6a930 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -205,7 +205,7 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { w.rateLimitDefinitions = s.RateLimitDefinitions if s.ExchangeConfig.WebsocketMetricsLogging { - w.processReporter = &DefaultProcessReporterManager{} + w.processReporter = NewDefaultProcessReporterManager() } return nil } diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index 2904bccadee..6ce3bbab6e6 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -65,6 +65,7 @@ type testSubKey struct { var defaultSetup = &WebsocketSetup{ ExchangeConfig: &config.Exchange{ + WebsocketMetricsLogging: true, Features: &config.FeaturesConfig{ Enabled: config.FeaturesEnabledConfig{Websocket: true}, }, From 19eb241ee9931eaba93bb45cdbffdf6110c3c8ef Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Fri, 6 Dec 2024 11:45:05 +1100 Subject: [PATCH 5/9] linter: fix --- exchanges/stream/reporting_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchanges/stream/reporting_test.go b/exchanges/stream/reporting_test.go index edd1fc1157b..b95d4fef39c 100644 --- a/exchanges/stream/reporting_test.go +++ b/exchanges/stream/reporting_test.go @@ -12,7 +12,7 @@ type DummyConnection struct { } func (d *DummyConnection) ReadMessage() Response { - return Response{Raw: []byte(<-d.ch)} + return Response{Raw: <-d.ch} } func (d *DummyConnection) Push(data []byte) { From e0bb7f249305c1bfe0e23ab86ac755ad12b4dd9d Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 11 Dec 2024 14:39:51 +1100 Subject: [PATCH 6/9] Update exchanges/stream/websocket.go Co-authored-by: Scott --- exchanges/stream/websocket.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index d8f31e6a930..54664d86fc1 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -205,7 +205,11 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { w.rateLimitDefinitions = s.RateLimitDefinitions if s.ExchangeConfig.WebsocketMetricsLogging { - w.processReporter = NewDefaultProcessReporterManager() + if s.UseMultiConnectionManagement { + w.processReporter = NewDefaultProcessReporterManager() + } else { + log.Warnf(log.WebsocketMgr, "%s websocket: metrics logging is only supported with multi connection management supported exchanges", w.exchangeName) + } } return nil } From 0b90c2a8bd98f7b34e2ece81b45bb9d1442469ea Mon Sep 17 00:00:00 2001 From: shazbert Date: Wed, 11 Dec 2024 14:48:36 +1100 Subject: [PATCH 7/9] rm dots that are not needed --- exchanges/stream/reporting.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchanges/stream/reporting.go b/exchanges/stream/reporting.go index 96916355bce..507bf2819be 100644 --- a/exchanges/stream/reporting.go +++ b/exchanges/stream/reporting.go @@ -110,7 +110,7 @@ func (r *defaultProcessReporter) collectMetrics(conn Connection, period time.Dur peakCause = append(peakCause[:100], []byte("...")...) } // Log metrics outside of the critical section to avoid blocking other threads. - log.Debugf(log.WebsocketMgr, "Connection: %v Operations/Second: %.2f, Avg Processing/Operation: %v, Errors: %v Peak: %v Cause: %v...", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, errors, peakTime, string(peakCause)) + log.Debugf(log.WebsocketMgr, "Connection: %v Operations/Second: %.2f, Avg Processing/Operation: %v, Errors: %v Peak: %v Cause: %v", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, errors, peakTime, string(peakCause)) } else { r.m.Unlock() } From bc24ad3e1bbeb10a4d3a3824435430fe7704c8c1 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 11 Dec 2024 14:49:04 +1100 Subject: [PATCH 8/9] Update exchanges/stream/reporting.go Co-authored-by: Scott --- exchanges/stream/reporting.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchanges/stream/reporting.go b/exchanges/stream/reporting.go index 507bf2819be..51608b6dba1 100644 --- a/exchanges/stream/reporting.go +++ b/exchanges/stream/reporting.go @@ -20,7 +20,7 @@ type ProcessReporter interface { // data is the raw data received from the connection. // err is any error that occurred while processing the data. Report(read time.Time, data []byte, err error) - // close closes the process reporter and handles any cleanup. + // Close closes the process reporter and handles any cleanup. Close() } From 73da218c9a51d57ec56e9a54a25fb22e4061ba23 Mon Sep 17 00:00:00 2001 From: shazbert Date: Tue, 14 Jan 2025 16:56:32 +1100 Subject: [PATCH 9/9] config version --- config/versions/v3.go | 39 +++++++++++++++++++++++++++++++ config/versions/v3_test.go | 47 ++++++++++++++++++++++++++++++++++++++ config_example.json | 24 +++++++++++++++++++ testdata/configtest.json | 24 +++++++++++++++++++ 4 files changed, 134 insertions(+) create mode 100644 config/versions/v3.go create mode 100644 config/versions/v3_test.go diff --git a/config/versions/v3.go b/config/versions/v3.go new file mode 100644 index 00000000000..23e10dd9270 --- /dev/null +++ b/config/versions/v3.go @@ -0,0 +1,39 @@ +package versions + +import ( + "context" + "encoding/json" + + "github.com/buger/jsonparser" +) + +// Version3 is an ExchangeVersion to add the websocketMetricsLogging field +type Version3 struct { +} + +func init() { + Manager.registerVersion(3, &Version3{}) +} + +// Exchanges returns all exchanges: "*" +func (v *Version3) Exchanges() []string { return []string{"*"} } + +// UpgradeExchange will upgrade the exchange config with the websocketMetricsLogging field +func (v *Version3) UpgradeExchange(_ context.Context, e []byte) ([]byte, error) { + if len(e) == 0 { + return e, nil + } + if _, _, _, err := jsonparser.Get(e, "websocketMetricsLogging"); err == nil { + return e, nil + } + val, err := json.Marshal(false) + if err != nil { + return nil, err + } + return jsonparser.Set(e, val, "websocketMetricsLogging") +} + +// DowngradeExchange will downgrade the exchange config by removing the websocketMetricsLogging field +func (v *Version3) DowngradeExchange(_ context.Context, e []byte) ([]byte, error) { + return jsonparser.Delete(e, "websocketMetricsLogging"), nil +} diff --git a/config/versions/v3_test.go b/config/versions/v3_test.go new file mode 100644 index 00000000000..3f3bccd4f19 --- /dev/null +++ b/config/versions/v3_test.go @@ -0,0 +1,47 @@ +package versions + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestVersion3UpgradeExchange(t *testing.T) { + t.Parallel() + + got, err := (&Version3{}).UpgradeExchange(context.Background(), nil) + require.NoError(t, err) + require.Nil(t, got) + + payload := []byte(`{"name":"test"}`) + expected := []byte(`{"name":"test","websocketMetricsLogging":false}`) + got, err = (&Version3{}).UpgradeExchange(context.Background(), payload) + require.NoError(t, err) + require.Equal(t, expected, got) + + payload = []byte(`{"name":"test","websocketMetricsLogging":true}`) + got, err = (&Version3{}).UpgradeExchange(context.Background(), payload) + require.NoError(t, err) + require.Equal(t, payload, got) +} + +func TestVersion3DowngradeExchange(t *testing.T) { + t.Parallel() + + got, err := (&Version3{}).DowngradeExchange(context.Background(), nil) + require.NoError(t, err) + require.Nil(t, got) + + payload := []byte(`{"name":"test","websocketMetricsLogging":false}`) + expected := []byte(`{"name":"test"}`) + got, err = (&Version3{}).DowngradeExchange(context.Background(), payload) + require.NoError(t, err) + require.Equal(t, expected, got) +} + +func TestVersion3Exchanges(t *testing.T) { + t.Parallel() + assert := require.New(t) + assert.Equal([]string{"*"}, (&Version3{}).Exchanges()) +} diff --git a/config_example.json b/config_example.json index 97053af4445..74e5cd8ede5 100644 --- a/config_example.json +++ b/config_example.json @@ -266,6 +266,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -345,6 +346,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -422,6 +424,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -499,6 +502,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -583,6 +587,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -659,6 +664,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -738,6 +744,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -821,6 +828,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -933,6 +941,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1010,6 +1019,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1136,6 +1146,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1213,6 +1224,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1293,6 +1305,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1433,6 +1446,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1512,6 +1526,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1621,6 +1636,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1696,6 +1712,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1773,6 +1790,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1850,6 +1868,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1930,6 +1949,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -2037,6 +2057,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -2115,6 +2136,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -2227,6 +2249,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -2305,6 +2328,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, diff --git a/testdata/configtest.json b/testdata/configtest.json index 04f87dc7cc6..6db34a62de6 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -246,6 +246,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -327,6 +328,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -408,6 +410,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -489,6 +492,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -573,6 +577,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -690,6 +695,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -771,6 +777,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -853,6 +860,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -955,6 +963,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1032,6 +1041,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1161,6 +1171,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1238,6 +1249,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1318,6 +1330,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1458,6 +1471,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1537,6 +1551,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1647,6 +1662,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1725,6 +1741,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1806,6 +1823,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -1912,6 +1930,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -2009,6 +2028,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -2116,6 +2136,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -2194,6 +2215,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -2306,6 +2328,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000, @@ -2384,6 +2407,7 @@ "enabled": true, "verbose": false, "httpTimeout": 15000000000, + "websocketMetricsLogging": false, "websocketResponseCheckTimeout": 30000000, "websocketResponseMaxLimit": 7000000000, "websocketTrafficTimeout": 30000000000,