Skip to content

Commit

Permalink
Fix potential unbounded LLO transmit queue (#16166)
Browse files Browse the repository at this point in the history
* Refactor LLO transmission queue cleanup #16166

Bugfixes and improvements intended to make queue management more reliable and safe.

- Reduces required number of DB connections and reduce overall DB transaction load
- Fixes potential for unbounded queue growth
- Fixes possibility of OOM trying to load too many records on boot
- Remove duplicate deletion code from server.go and manage everything in the persistence manager
- Introduce an application-wide global reaper for last-ditch cleanup effort
- Implement delete batching for more reliable and incremental deletion
- Ensure that records are properly removed on exit

* Fix conflict with develop
  • Loading branch information
samsondav authored Feb 5, 2025
1 parent 6ca1531 commit 3a7cfeb
Show file tree
Hide file tree
Showing 48 changed files with 1,202 additions and 401 deletions.
11 changes: 11 additions & 0 deletions .changeset/sixty-eagles-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"chainlink": patch
---

Performance improvements and bugfixes for LLO transmission queue #internal #fixed

- Reduces required number of DB connections and reduce overall DB transaction load
- Remove duplicate deletion code from server.go and manage everything in the persistence manager
- Introduce an application-wide global reaper for last-ditch cleanup effort
- Implement delete batching for more reliable and incremental deletion
- Ensure that records are properly removed on exit
2 changes: 2 additions & 0 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
capabilitiesRegistry := capabilities.NewRegistry(appLggr)

retirementReportCache := llo.NewRetirementReportCache(appLggr, ds)
lloReaper := llo.NewTransmissionReaper(ds, appLggr, cfg.Mercury().Transmitter().ReaperFrequency().Duration(), cfg.Mercury().Transmitter().ReaperMaxAge().Duration())

unrestrictedClient := clhttp.NewUnrestrictedHTTPClient()
// create the relayer-chain interoperators from application configuration
Expand Down Expand Up @@ -338,6 +339,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
GRPCOpts: grpcOpts,
MercuryPool: mercuryPool,
RetirementReportCache: retirementReportCache,
LLOTransmissionReaper: lloReaper,
CapabilitiesRegistry: capabilitiesRegistry,
})
}
Expand Down
6 changes: 6 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,12 @@ TransmitTimeout = "5s" # Default
#
# Only has effect with LLO jobs.
TransmitConcurrency = 100 # Default
# ReaperFrequency controls how often the stale transmission reaper will run.
# Setting to 0 disables the reaper.
ReaperFrequency = "1h" # Default
# ReaperMaxAge controls how old a transmission can be before it is considered
# stale. Setting to 0 disables the reaper.
ReaperMaxAge = "48h" # Default

# Telemetry holds OTEL settings.
# This data includes open telemetry metrics, traces, & logs.
Expand Down
2 changes: 2 additions & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type MercuryTransmitter interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
TransmitConcurrency() uint32
ReaperFrequency() commonconfig.Duration
ReaperMaxAge() commonconfig.Duration
}

type Mercury interface {
Expand Down
8 changes: 8 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,8 @@ type MercuryTransmitter struct {
TransmitQueueMaxSize *uint32
TransmitTimeout *commonconfig.Duration
TransmitConcurrency *uint32
ReaperFrequency *commonconfig.Duration
ReaperMaxAge *commonconfig.Duration
}

func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
Expand All @@ -1348,6 +1350,12 @@ func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
if v := f.TransmitConcurrency; v != nil {
m.TransmitConcurrency = v
}
if v := f.ReaperFrequency; v != nil {
m.ReaperFrequency = v
}
if v := f.ReaperMaxAge; v != nil {
m.ReaperMaxAge = v
}
}

type Mercury struct {
Expand Down
14 changes: 9 additions & 5 deletions core/logger/test_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ import (
// Log level is DEBUG by default.
//
// Note: It is not necessary to Sync().
func TestLogger(tb testing.TB) SugaredLogger {
return testLogger(tb, nil)
func TestLogger(tb testing.TB, lvl ...zapcore.Level) SugaredLogger {
defaultLevel := zapcore.DebugLevel
if len(lvl) > 0 {
defaultLevel = lvl[0]
}
return testLogger(tb, nil, defaultLevel)
}

// TestLoggerObserved creates a logger with an observer that can be used to
Expand All @@ -24,12 +28,12 @@ func TestLogger(tb testing.TB) SugaredLogger {
// Note: It is not necessary to Sync().
func TestLoggerObserved(tb testing.TB, lvl zapcore.Level) (Logger, *observer.ObservedLogs) {
observedZapCore, observedLogs := observer.New(lvl)
return testLogger(tb, observedZapCore), observedLogs
return testLogger(tb, observedZapCore, lvl), observedLogs
}

// testLogger returns a new SugaredLogger for tests. core is optional.
func testLogger(tb testing.TB, core zapcore.Core) SugaredLogger {
a := zap.NewAtomicLevelAt(zap.DebugLevel)
func testLogger(tb testing.TB, core zapcore.Core, lvl zapcore.Level) SugaredLogger {
a := zap.NewAtomicLevelAt(lvl)
opts := []zaptest.LoggerOption{zaptest.Level(a)}
zapOpts := []zap.Option{zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel)}
if core != nil {
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ type ApplicationOpts struct {
GRPCOpts loop.GRPCOpts
MercuryPool wsrpc.Pool
RetirementReportCache llo.RetirementReportCache
LLOTransmissionReaper services.ServiceCtx
CapabilitiesRegistry *capabilities.Registry
CapabilitiesDispatcher remotetypes.Dispatcher
CapabilitiesPeerWrapper p2ptypes.PeerWrapper
Expand Down Expand Up @@ -488,6 +489,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
if opts.RetirementReportCache != nil {
srvcs = append(srvcs, opts.RetirementReportCache)
}
if opts.LLOTransmissionReaper != nil {
srvcs = append(srvcs, opts.LLOTransmissionReaper)
}

// EVM chains are used all over the place. This will need to change for fully EVM extraction
// TODO: BCF-2510, BCF-2511
Expand Down
8 changes: 8 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ func (m *mercuryTransmitterConfig) TransmitConcurrency() uint32 {
return *m.c.TransmitConcurrency
}

func (m *mercuryTransmitterConfig) ReaperFrequency() commonconfig.Duration {
return *m.c.ReaperFrequency
}

func (m *mercuryTransmitterConfig) ReaperMaxAge() commonconfig.Duration {
return *m.c.ReaperMaxAge
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,8 @@ func TestConfig_Marshal(t *testing.T) {
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
TransmitConcurrency: ptr(uint32(456)),
ReaperFrequency: commoncfg.MustNewDuration(567 * time.Second),
ReaperMaxAge: commoncfg.MustNewDuration(678 * time.Hour),
},
VerboseLogging: ptr(true),
}
Expand Down Expand Up @@ -1381,6 +1383,8 @@ Protocol = 'grpc'
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456
ReaperFrequency = '9m27s'
ReaperMaxAge = '678h0m0s'
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ Protocol = 'wsrpc'
TransmitQueueMaxSize = 100000
TransmitTimeout = '5s'
TransmitConcurrency = 100
ReaperFrequency = '1h0m0s'
ReaperMaxAge = '48h0m0s'

[Capabilities]
[Capabilities.RateLimit]
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ Protocol = 'grpc'
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456
ReaperFrequency = '9m27s'
ReaperMaxAge = '678h0m0s'

[Capabilities]
[Capabilities.RateLimit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ Protocol = 'wsrpc'
TransmitQueueMaxSize = 100000
TransmitTimeout = '5s'
TransmitConcurrency = 100
ReaperFrequency = '1h0m0s'
ReaperMaxAge = '48h0m0s'

[Capabilities]
[Capabilities.RateLimit]
Expand Down
113 changes: 109 additions & 4 deletions core/services/llo/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package llo

import (
"context"
"database/sql"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/mercurytransmitter"
)

func Cleanup(ctx context.Context, lp LogPoller, addr common.Address, donID uint32, ds sqlutil.DataSource, chainSelector uint64) error {
Expand All @@ -20,9 +23,111 @@ func Cleanup(ctx context.Context, lp LogPoller, addr common.Address, donID uint3
return fmt.Errorf("failed to cleanup channel definitions: %w", err)
}
}
torm := mercurytransmitter.NewORM(ds, donID)
if err := torm.Cleanup(ctx); err != nil {
return fmt.Errorf("failed to cleanup transmitter: %w", err)
// Don't bother deleting transmission records since it can be really slow
// to do that if you have a job that's been erroring for a long time. Let
// the reaper handle it async instead.
return nil
}

const (
// TransmissionReaperBatchSize is the number of transmissions to delete in a
// single batch.
TransmissionReaperBatchSize = 10_000
// TransmissionReaperRetryFrequency is the frequency at which the reaper
// will retry if it fails to delete stale transmissions.
TransmissionReaperRetryFrequency = 5 * time.Second
)

type transmissionReaper struct {
services.Service
eng *services.Engine
ds sqlutil.DataSource
lggr logger.Logger
reapFreq time.Duration
maxAge time.Duration
}

// NewTransmissionReaper returns a new transmission reaper service
//
// In theory, if everything is working properly, there will never be stale
// transmissions. In practice there can be bugs, jobs that get deleted without
// proper cleanup etc. This acts as a sanity check to evict obviously stale
// entries from the llo_mercury_transmit_queue table.
func NewTransmissionReaper(ds sqlutil.DataSource, lggr logger.Logger, freq, maxAge time.Duration) services.Service {
t := &transmissionReaper{ds: ds, lggr: lggr, reapFreq: freq, maxAge: maxAge}
t.Service, t.eng = services.Config{
Name: "LLOTransmissionReaper",
Start: t.start,
}.NewServiceEngine(lggr)
return t
}

func (t *transmissionReaper) start(context.Context) error {
if t.reapFreq == 0 || t.maxAge == 0 {
t.eng.Debugw("Transmission reaper disabled", "reapFreq", t.reapFreq, "maxAge", t.maxAge)
return nil
}
t.eng.Go(t.runLoop)
return nil
}

func (t *transmissionReaper) runLoop(ctx context.Context) {
t.eng.Debugw("Transmission reaper running", "reapFreq", t.reapFreq, "maxAge", t.maxAge)
ticker := services.NewTicker(t.reapFreq)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for {
// TODO: Could also automatically reap orphaned transmissions
// that don't have a job with a matching DON ID (from job
// deletion)
//
// https://smartcontract-it.atlassian.net/browse/MERC-6807
if n, err := t.reapStale(ctx, TransmissionReaperBatchSize); err != nil {
t.lggr.Errorw("Failed to reap", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(TransmissionReaperRetryFrequency):
continue
}
} else {
t.lggr.Debugw("Reaped stale transmissions", "nDeleted", n)
}
}
}
}
}

func (t *transmissionReaper) reapStale(ctx context.Context, batchSize int) (rowsDeleted int64, err error) {
for {
var res sql.Result
res, err = t.ds.ExecContext(ctx, `
DELETE FROM llo_mercury_transmit_queue AS q
USING (
SELECT transmission_hash
FROM llo_mercury_transmit_queue
WHERE inserted_at < $1
ORDER BY inserted_at ASC
LIMIT $2
) AS to_delete
WHERE q.transmission_hash = to_delete.transmission_hash;
`, time.Now().Add(-t.maxAge), batchSize)
if err != nil {
return rowsDeleted, fmt.Errorf("transmissionReaper: failed to delete stale transmissions: %w", err)
}
var rowsAffected int64
rowsAffected, err = res.RowsAffected()
if err != nil {
return rowsDeleted, fmt.Errorf("transmissionReaper: failed to get rows affected: %w", err)
}
if rowsAffected == 0 {
break
}
rowsDeleted += rowsAffected
}
return rowsDeleted, nil
}
57 changes: 50 additions & 7 deletions core/services/llo/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package llo

import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
Expand All @@ -14,9 +15,18 @@ import (
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/mercurytransmitter"
)

func makeSampleTransmissions(n int) []*mercurytransmitter.Transmission {
transmissions := make([]*mercurytransmitter.Transmission, n)
for i := 0; i < n; i++ {
transmissions[i] = makeSampleTransmission(uint64(i), "http://example.com/foo") //nolint:gosec // G115 don't care in test code
}
return transmissions
}

func makeSampleTransmission(seqNr uint64, sURL string) *mercurytransmitter.Transmission {
return &mercurytransmitter.Transmission{
ServerURL: sURL,
Expand Down Expand Up @@ -84,19 +94,52 @@ func Test_Cleanup(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, pd)
})
t.Run("removes transmissions", func(t *testing.T) {
trs, err := torm1.Get(ctx, srvURL1)
t.Run("does not remove transmissions", func(t *testing.T) {
trs, err := torm1.Get(ctx, srvURL1, 10)
require.NoError(t, err)
assert.Len(t, trs, 0)
trs, err = torm1.Get(ctx, srvURL2)
assert.Len(t, trs, 1)
trs, err = torm1.Get(ctx, srvURL2, 10)
require.NoError(t, err)
assert.Len(t, trs, 0)
assert.Len(t, trs, 1)

trs, err = torm2.Get(ctx, srvURL1)
trs, err = torm2.Get(ctx, srvURL1, 10)
require.NoError(t, err)
assert.Len(t, trs, 1)
trs, err = torm2.Get(ctx, srvURL2)
trs, err = torm2.Get(ctx, srvURL2, 10)
require.NoError(t, err)
assert.Len(t, trs, 1)
})
}

func Test_TransmissionReaper(t *testing.T) {
ds := pgtest.NewSqlxDB(t)
lggr := logger.TestLogger(t)
tr := &transmissionReaper{ds: ds, lggr: lggr, maxAge: 24 * time.Hour}
ctx := testutils.Context(t)

const n = 13

transmissions := makeSampleTransmissions(n)
torm := mercurytransmitter.NewORM(ds, 1)
err := torm.Insert(testutils.Context(t), transmissions)
require.NoError(t, err)
pgtest.MustExec(t, ds, `
UPDATE llo_mercury_transmit_queue
SET inserted_at = NOW() - INTERVAL '48 hours'
WHERE transmission_hash IN (
SELECT transmission_hash FROM llo_mercury_transmit_queue
LIMIT 5
);
`)

// test batching
d, err := tr.reapStale(ctx, n/3)
require.NoError(t, err)
assert.Equal(t, int64(5), d)

pgtest.MustExec(t, ds, "UPDATE llo_mercury_transmit_queue SET inserted_at = NOW() - INTERVAL '48 hours'")

d, err = tr.reapStale(ctx, n/3)
require.NoError(t, err)
assert.Equal(t, int64(n-5), d)
}
Loading

0 comments on commit 3a7cfeb

Please sign in to comment.