From d90c3b50205cafef7fc4fa5b3fe15574598b04ab Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 27 Aug 2024 16:44:00 +0900 Subject: [PATCH 1/2] fix: consume channel to prevent memory leak --- .../chain/websocketchainreader/websocketreader.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/node/pkg/chain/websocketchainreader/websocketreader.go b/node/pkg/chain/websocketchainreader/websocketreader.go index d77fe1c16..8c74d21f8 100644 --- a/node/pkg/chain/websocketchainreader/websocketreader.go +++ b/node/pkg/chain/websocketchainreader/websocketreader.go @@ -115,14 +115,25 @@ func (c *ChainReader) handleSubscription(ctx context.Context, config *SubscribeC Addresses: []common.Address{common.HexToAddress(config.Address)}, } + headerSubChan := make(chan *types.Header, 1) // Subscribe to new head just to keep connection alive (ignoring the results) - subNewHead, err := c.client(config.ChainType).SubscribeNewHead(ctx, make(chan *types.Header)) + subNewHead, err := c.client(config.ChainType).SubscribeNewHead(ctx, headerSubChan) if err != nil { log.Warn().Err(err).Msg("Failed to subscribe to new head") } else { defer subNewHead.Unsubscribe() } + go func() { + for { + select { + case <-ctx.Done(): + return + case <-headerSubChan: + } + } + }() + logs := make(chan types.Log) sub, err := c.client(config.ChainType).SubscribeFilterLogs(ctx, query, logs) if err != nil { From f10563a424d51dc4dc9fa62d890d35f077752d7d Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 27 Aug 2024 16:55:34 +0900 Subject: [PATCH 2/2] fix: update based on feedback --- node/pkg/chain/websocketchainreader/websocketreader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/node/pkg/chain/websocketchainreader/websocketreader.go b/node/pkg/chain/websocketchainreader/websocketreader.go index 8c74d21f8..56ebd2d65 100644 --- a/node/pkg/chain/websocketchainreader/websocketreader.go +++ b/node/pkg/chain/websocketchainreader/websocketreader.go @@ -116,6 +116,7 @@ func (c *ChainReader) handleSubscription(ctx context.Context, config *SubscribeC } headerSubChan := make(chan *types.Header, 1) + defer close(headerSubChan) // Subscribe to new head just to keep connection alive (ignoring the results) subNewHead, err := c.client(config.ChainType).SubscribeNewHead(ctx, headerSubChan) if err != nil {