Skip to content

Commit

Permalink
Refactor LLO transmission queue cleanup #16166
Browse files Browse the repository at this point in the history
We have observed unbounded queue growth in production. Not sure of the cause. This PR implements a swathe of measures intended to make queue management more reliable and safe.

- Reduces required number of DB connections and reduce overall DB transaction load
- Remove duplicate deletion code from server.go and manage everything in the persistence manager
- Introduce an application-wide global reaper for last-ditch cleanup effort
- Implement delete batching for more reliable and incremental deletion
  • Loading branch information
samsondav committed Feb 3, 2025
1 parent ce0d4c1 commit a32255c
Show file tree
Hide file tree
Showing 44 changed files with 584 additions and 379 deletions.
2 changes: 2 additions & 0 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
capabilitiesRegistry := capabilities.NewRegistry(appLggr)

retirementReportCache := llo.NewRetirementReportCache(appLggr, ds)
lloReaper := llo.NewTransmissionReaper(ds, appLggr, cfg.Mercury().Transmitter().ReaperFrequency().Duration(), cfg.Mercury().Transmitter().ReaperMaxAge().Duration())

unrestrictedClient := clhttp.NewUnrestrictedHTTPClient()
// create the relayer-chain interoperators from application configuration
Expand Down Expand Up @@ -338,6 +339,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
GRPCOpts: grpcOpts,
MercuryPool: mercuryPool,
RetirementReportCache: retirementReportCache,
LLOTransmissionReaper: lloReaper,
CapabilitiesRegistry: capabilitiesRegistry,
})
}
Expand Down
6 changes: 6 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,12 @@ TransmitTimeout = "5s" # Default
#
# Only has effect with LLO jobs.
TransmitConcurrency = 100 # Default
# ReaperFrequency controls how often the stale transmission reaper will run.
# Setting to 0 disables the reaper.
ReaperFrequency = "1h" # Default
# ReaperMaxAge controls how old a transmission can be before it is considered
# stale. Setting to 0 disables the reaper.
ReaperMaxAge = "48h" # Default

# Telemetry holds OTEL settings.
# This data includes open telemetry metrics, traces, & logs.
Expand Down
2 changes: 2 additions & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type MercuryTransmitter interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
TransmitConcurrency() uint32
ReaperFrequency() commonconfig.Duration
ReaperMaxAge() commonconfig.Duration
}

type Mercury interface {
Expand Down
8 changes: 8 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,8 @@ type MercuryTransmitter struct {
TransmitQueueMaxSize *uint32
TransmitTimeout *commonconfig.Duration
TransmitConcurrency *uint32
ReaperFrequency *commonconfig.Duration
ReaperMaxAge *commonconfig.Duration
}

func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
Expand All @@ -1344,6 +1346,12 @@ func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
if v := f.TransmitConcurrency; v != nil {
m.TransmitConcurrency = v
}
if v := f.ReaperFrequency; v != nil {
m.ReaperFrequency = v
}
if v := f.ReaperMaxAge; v != nil {
m.ReaperMaxAge = v
}
}

type Mercury struct {
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ type ApplicationOpts struct {
GRPCOpts loop.GRPCOpts
MercuryPool wsrpc.Pool
RetirementReportCache llo.RetirementReportCache
LLOTransmissionReaper services.ServiceCtx
CapabilitiesRegistry *capabilities.Registry
CapabilitiesDispatcher remotetypes.Dispatcher
CapabilitiesPeerWrapper p2ptypes.PeerWrapper
Expand Down Expand Up @@ -488,6 +489,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
if opts.RetirementReportCache != nil {
srvcs = append(srvcs, opts.RetirementReportCache)
}
if opts.LLOTransmissionReaper != nil {
srvcs = append(srvcs, opts.LLOTransmissionReaper)
}

// EVM chains are used all over the place. This will need to change for fully EVM extraction
// TODO: BCF-2510, BCF-2511
Expand Down
8 changes: 8 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ func (m *mercuryTransmitterConfig) TransmitConcurrency() uint32 {
return *m.c.TransmitConcurrency
}

func (m *mercuryTransmitterConfig) ReaperFrequency() commonconfig.Duration {
return *m.c.ReaperFrequency
}

func (m *mercuryTransmitterConfig) ReaperMaxAge() commonconfig.Duration {
return *m.c.ReaperMaxAge
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,8 @@ func TestConfig_Marshal(t *testing.T) {
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
TransmitConcurrency: ptr(uint32(456)),
ReaperFrequency: commoncfg.MustNewDuration(567 * time.Second),
ReaperMaxAge: commoncfg.MustNewDuration(678 * time.Hour),
},
VerboseLogging: ptr(true),
}
Expand Down Expand Up @@ -1379,6 +1381,8 @@ Protocol = 'grpc'
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456
ReaperFrequency = '9m27s'
ReaperMaxAge = '678h0m0s'
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ Protocol = 'wsrpc'
TransmitQueueMaxSize = 100000
TransmitTimeout = '5s'
TransmitConcurrency = 100
ReaperFrequency = '1h0m0s'
ReaperMaxAge = '48h0m0s'

[Capabilities]
[Capabilities.RateLimit]
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ Protocol = 'grpc'
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456
ReaperFrequency = '9m27s'
ReaperMaxAge = '678h0m0s'

[Capabilities]
[Capabilities.RateLimit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ Protocol = 'wsrpc'
TransmitQueueMaxSize = 100000
TransmitTimeout = '5s'
TransmitConcurrency = 100
ReaperFrequency = '1h0m0s'
ReaperMaxAge = '48h0m0s'

[Capabilities]
[Capabilities.RateLimit]
Expand Down
89 changes: 89 additions & 0 deletions core/services/llo/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package llo

import (
"context"
"database/sql"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

"github.com/smartcontractkit/chainlink/v2/core/services/llo/mercurytransmitter"
)

Expand All @@ -26,3 +31,87 @@ func Cleanup(ctx context.Context, lp LogPoller, addr common.Address, donID uint3
}
return nil
}

type transmissionReaper struct {
services.Service
eng *services.Engine
ds sqlutil.DataSource
lggr logger.Logger
reapFreq time.Duration
maxAge time.Duration
}

// NewTransmissionReaper returns a new transmission reaper service
//
// In theory, if everything is working properly, there will never be stale
// transmissions. In practice there can be bugs, jobs that get deleted without
// proper cleanup etc. This acts as a sanity check to evict obviously stale
// entries from the llo_mercury_transmit_queue table.
func NewTransmissionReaper(ds sqlutil.DataSource, lggr logger.Logger, freq, maxAge time.Duration) services.Service {
t := &transmissionReaper{ds: ds, lggr: lggr, reapFreq: freq, maxAge: maxAge}
t.Service, t.eng = services.Config{
Name: "LLOTransmissionReaper",
Start: t.start,
}.NewServiceEngine(lggr)
return t
}

func (t *transmissionReaper) start(context.Context) error {
if t.reapFreq == 0 || t.maxAge == 0 {
t.eng.Debugw("Transmission reaper disabled", "reapFreq", t.reapFreq, "maxAge", t.maxAge)
return nil
}
t.eng.Go(t.runLoop)
return nil
}

func (t *transmissionReaper) runLoop(ctx context.Context) {
t.eng.Debugw("Transmission reaper running", "reapFreq", t.reapFreq, "maxAge", t.maxAge)
ticker := services.NewTicker(t.reapFreq)
defer ticker.Stop()
const batchSize = 10_000
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for {
if n, err := t.reap(ctx, batchSize); err != nil {
t.lggr.Errorw("Failed to reap", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
continue
}
} else {
t.lggr.Debugw("Reaped stale transmissions", "nDeleted", n)
}
}
}
}
}

func (t *transmissionReaper) reap(ctx context.Context, batchSize int) (rowsDeleted int64, err error) {
for {
var res sql.Result
res, err = t.ds.ExecContext(ctx, `
DELETE FROM llo_mercury_transmit_queue
WHERE inserted_at < $1
LIMIT $2
`, time.Now().Add(-t.maxAge), batchSize)
if err != nil {
return rowsDeleted, fmt.Errorf("transmissionReaper: failed to delete stale transmissions: %w", err)
}
var rowsAffected int64
rowsAffected, err = res.RowsAffected()
if err != nil {
return rowsDeleted, fmt.Errorf("transmissionReaper: failed to get rows affected: %w", err)
}
if rowsAffected == 0 {
break
}
rowsDeleted += rowsAffected
}
return rowsDeleted, nil
}
44 changes: 40 additions & 4 deletions core/services/llo/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package llo

import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
Expand All @@ -14,9 +15,18 @@ import (
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/mercurytransmitter"
)

func makeSampleTransmissions(n int) []*mercurytransmitter.Transmission {
transmissions := make([]*mercurytransmitter.Transmission, n)
for i := 0; i < n; i++ {
transmissions[i] = makeSampleTransmission(uint64(i), "http://example.com/foo")

Check failure on line 25 in core/services/llo/cleanup_test.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

G115: integer overflow conversion int -> uint64 (gosec)
}
return transmissions
}

func makeSampleTransmission(seqNr uint64, sURL string) *mercurytransmitter.Transmission {
return &mercurytransmitter.Transmission{
ServerURL: sURL,
Expand Down Expand Up @@ -85,18 +95,44 @@ func Test_Cleanup(t *testing.T) {
assert.NotNil(t, pd)
})
t.Run("removes transmissions", func(t *testing.T) {
trs, err := torm1.Get(ctx, srvURL1)
trs, err := torm1.Get(ctx, srvURL1, 10)
require.NoError(t, err)
assert.Len(t, trs, 0)
trs, err = torm1.Get(ctx, srvURL2)
trs, err = torm1.Get(ctx, srvURL2, 10)
require.NoError(t, err)
assert.Len(t, trs, 0)

trs, err = torm2.Get(ctx, srvURL1)
trs, err = torm2.Get(ctx, srvURL1, 10)
require.NoError(t, err)
assert.Len(t, trs, 1)
trs, err = torm2.Get(ctx, srvURL2)
trs, err = torm2.Get(ctx, srvURL2, 10)
require.NoError(t, err)
assert.Len(t, trs, 1)
})
}

func Test_TransmissionReaper(t *testing.T) {
ds := pgtest.NewSqlxDB(t)
lggr := logger.TestLogger(t)
tr := &transmissionReaper{ds: ds, lggr: lggr, maxAge: 24 * time.Hour}
ctx := testutils.Context(t)

const n = 13

transmissions := makeSampleTransmissions(n)
torm := mercurytransmitter.NewORM(ds, 1)
err := torm.Insert(testutils.Context(t), transmissions)
require.NoError(t, err)
pgtest.MustExec(t, ds, "UPDATE mercury_transmit_requests SET created_at = NOW() - INTERVAL '48 hours' LIMIT 5")

// test batching
d, err := tr.reap(ctx, n/3)
require.NoError(t, err)
assert.Equal(t, int64(5), d)

pgtest.MustExec(t, ds, "UPDATE mercury_transmit_requests SET created_at = NOW() - INTERVAL '48 hours'")

d, err = tr.reap(ctx, n/3)
require.NoError(t, err)
assert.Equal(t, int64(n-5), d)
}
10 changes: 5 additions & 5 deletions core/services/llo/mercurytransmitter/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func makeSampleTransmission(seqNr uint64) *Transmission {
}
}

func makeSampleTransmissions() []*Transmission {
return []*Transmission{
makeSampleTransmission(1001),
makeSampleTransmission(1002),
makeSampleTransmission(1003),
func makeSampleTransmissions(n int) []*Transmission {
transmissions := make([]*Transmission, n)
for i := 0; i < n; i++ {
transmissions[i] = makeSampleTransmission(uint64(i))

Check failure on line 44 in core/services/llo/mercurytransmitter/helpers_test.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

G115: integer overflow conversion int -> uint64 (gosec)
}
return transmissions
}
Loading

0 comments on commit a32255c

Please sign in to comment.