From cea0c29cae4592b8bb3bf668384e0b9bc14642ae Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Tue, 10 Dec 2024 14:50:10 +0800 Subject: [PATCH 1/6] opt truncatePending and truncateQueue --- core/txpool/legacypool/legacypool.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 202f495dc..d7aeac2b7 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -270,6 +270,9 @@ type LegacyPool struct { all *lookup // All transactions to allow lookups priced *pricedList // All transactions sorted by price + pendingCounter int + queueCounter int + pendingCache *cacheForMiner //pending list cache for miner reqResetCh chan *txpoolResetRequest @@ -1004,6 +1007,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local } else { // Nothing was replaced, bump the queued counter queuedGauge.Inc(1) + pool.queueCounter++ } // If the transaction isn't in lookup set but it's expected to be there, // show the error log. @@ -1062,6 +1066,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ pool.pendingCache.del([]*types.Transaction{old}, pool.signer) } else { // Nothing was replaced, bump the pending counter + pool.pendingCounter++ pendingGauge.Inc(1) } // Set the potentially new pending nonce and notify any subsystems of the new tx @@ -1291,6 +1296,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo // Reduce the pending counter pool.pendingCache.del(append(invalids, tx), pool.signer) pendingGauge.Dec(int64(1 + len(invalids))) + pool.pendingCounter -= 1 + len(invalids) return 1 + len(invalids) } } @@ -1299,6 +1305,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo if removed, _ := future.Remove(tx); removed { // Reduce the queued counter queuedGauge.Dec(1) + pool.queueCounter -= 1 } if future.Empty() { delete(pool.queue, addr) @@ -1710,6 +1717,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T } log.Trace("Promoted queued transactions", "count", len(promoted)) queuedGauge.Dec(int64(len(readies))) + pool.queueCounter -= len(readies) // Drop all transactions over the allowed limit var caps types.Transactions @@ -1725,6 +1733,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T // Mark all the items dropped as removed pool.priced.Removed(len(forwards) + len(drops) + len(caps)) queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) + pool.queueCounter -= len(forwards) + len(drops) + len(caps) if pool.locals.contains(addr) { localGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) } @@ -1744,10 +1753,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T // pending limit. The algorithm tries to reduce transaction counts by an approximately // equal number for all for accounts with many pending transactions. func (pool *LegacyPool) truncatePending() { - pending := uint64(0) - for _, list := range pool.pending { - pending += uint64(list.Len()) - } + pending := uint64(pool.pendingCounter) if pending <= pool.config.GlobalSlots { return } @@ -1792,6 +1798,7 @@ func (pool *LegacyPool) truncatePending() { pool.priced.Removed(len(caps)) dropPendingCache = append(dropPendingCache, caps...) pendingGauge.Dec(int64(len(caps))) + pool.pendingCounter -= len(caps) if pool.locals.contains(offenders[i]) { localGauge.Dec(int64(len(caps))) } @@ -1820,6 +1827,7 @@ func (pool *LegacyPool) truncatePending() { dropPendingCache = append(dropPendingCache, caps...) pool.priced.Removed(len(caps)) pendingGauge.Dec(int64(len(caps))) + pool.pendingCounter -= len(caps) if pool.locals.contains(addr) { localGauge.Dec(int64(len(caps))) } @@ -1833,10 +1841,7 @@ func (pool *LegacyPool) truncatePending() { // truncateQueue drops the oldest transactions in the queue if the pool is above the global queue limit. func (pool *LegacyPool) truncateQueue() { - queued := uint64(0) - for _, list := range pool.queue { - queued += uint64(list.Len()) - } + queued := uint64(pool.queueCounter) if queued <= pool.config.GlobalQueue { return } @@ -1932,6 +1937,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { dropPendingCache = append(dropPendingCache, invalids...) dropPendingCache = append(dropPendingCache, drops...) pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) + pool.pendingCounter -= len(olds) + len(drops) + len(invalids) if pool.locals.contains(addr) { localGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) } @@ -1947,6 +1953,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { } dropPendingCache = append(dropPendingCache, gapped...) pendingGauge.Dec(int64(len(gapped))) + pool.pendingCounter -= len(gapped) } // Delete the entire pending entry if it became empty. if list.Empty() { From 1168abd138fb8f190470ac7ff285266b68e16208 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Tue, 7 Jan 2025 15:43:56 +0800 Subject: [PATCH 2/6] Txpool optimization: disable cache if --mine is not enabled (#245) Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com> Co-authored-by: andyzhang2023 --- accounts/abi/bind/util_test.go | 3 - cmd/geth/config.go | 4 + core/txpool/legacypool/cache_for_miner.go | 63 +++---------- core/txpool/legacypool/legacypool.go | 91 ++++++++++--------- core/txpool/legacypool/legacypool_test.go | 26 +++++- core/txpool/legacypool/nonecache_for_miner.go | 52 +++++++++++ ethclient/simulated/backend.go | 2 - ethclient/simulated/backend_test.go | 1 - miner/worker.go | 2 +- 9 files changed, 144 insertions(+), 100 deletions(-) create mode 100644 core/txpool/legacypool/nonecache_for_miner.go diff --git a/accounts/abi/bind/util_test.go b/accounts/abi/bind/util_test.go index 87917d43f..592465f2a 100644 --- a/accounts/abi/bind/util_test.go +++ b/accounts/abi/bind/util_test.go @@ -83,7 +83,6 @@ func TestWaitDeployed(t *testing.T) { // Send and mine the transaction. backend.Client().SendTransaction(ctx, tx) - time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() select { @@ -118,7 +117,6 @@ func TestWaitDeployedCornerCases(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() backend.Client().SendTransaction(ctx, tx) - time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() notContractCreation := errors.New("tx is not contract creation") if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() { @@ -137,6 +135,5 @@ func TestWaitDeployedCornerCases(t *testing.T) { }() backend.Client().SendTransaction(ctx, tx) - time.Sleep(500 * time.Millisecond) //wait for the tx to be mined cancel() } diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 27496113d..da797c727 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -219,6 +219,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { cfg.Eth.OverrideVerkle = &v } + if ctx.Bool(utils.MiningEnabledFlag.Name) { + cfg.Eth.TxPool.EnableCache = true + } + backend, eth := utils.RegisterEthService(stack, &cfg.Eth) // Create gauge with geth system and build information diff --git a/core/txpool/legacypool/cache_for_miner.go b/core/txpool/legacypool/cache_for_miner.go index e8cedaa90..892d79739 100644 --- a/core/txpool/legacypool/cache_for_miner.go +++ b/core/txpool/legacypool/cache_for_miner.go @@ -5,10 +5,8 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" - "github.com/holiman/uint256" ) var ( @@ -16,24 +14,26 @@ var ( localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil) ) +type pendingCache interface { + add(types.Transactions, types.Signer) + del(types.Transactions, types.Signer) + dump() map[common.Address]types.Transactions + markLocal(common.Address) + flattenLocals() []common.Address +} + // copy of pending transactions type cacheForMiner struct { txLock sync.Mutex pending map[common.Address]map[*types.Transaction]struct{} locals map[common.Address]bool addrLock sync.Mutex - - allCache map[common.Address][]*txpool.LazyTransaction - filteredCache map[common.Address][]*txpool.LazyTransaction - cacheLock sync.Mutex } func newCacheForMiner() *cacheForMiner { return &cacheForMiner{ - pending: make(map[common.Address]map[*types.Transaction]struct{}), - locals: make(map[common.Address]bool), - allCache: make(map[common.Address][]*txpool.LazyTransaction), - filteredCache: make(map[common.Address][]*txpool.LazyTransaction), + pending: make(map[common.Address]map[*types.Transaction]struct{}), + locals: make(map[common.Address]bool), } } @@ -75,9 +75,8 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) { } } -func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) { +func (pc *cacheForMiner) dump() map[common.Address]types.Transactions { pending := make(map[common.Address]types.Transactions) - pc.txLock.Lock() for addr, txlist := range pc.pending { pending[addr] = make(types.Transactions, 0, len(txlist)) @@ -86,46 +85,10 @@ func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs ty } } pc.txLock.Unlock() - - // convert pending to lazyTransactions - filteredLazy := make(map[common.Address][]*txpool.LazyTransaction) - allLazy := make(map[common.Address][]*txpool.LazyTransaction) - for addr, txs := range pending { + for _, txs := range pending { // sorted by nonce sort.Sort(types.TxByNonce(txs)) - filterd := filter(txs, addr) - if len(txs) > 0 { - lazies := make([]*txpool.LazyTransaction, len(txs)) - for i, tx := range txs { - lazies[i] = &txpool.LazyTransaction{ - Pool: pool, - Hash: tx.Hash(), - Tx: tx, - Time: tx.Time(), - GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()), - GasTipCap: uint256.MustFromBig(tx.GasTipCap()), - Gas: tx.Gas(), - BlobGas: tx.BlobGas(), - } - } - allLazy[addr] = lazies - filteredLazy[addr] = lazies[:len(filterd)] - } - } - - pc.cacheLock.Lock() - pc.filteredCache = filteredLazy - pc.allCache = allLazy - pc.cacheLock.Unlock() -} - -func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction { - pc.cacheLock.Lock() - pending := pc.allCache - if filtered { - pending = pc.filteredCache } - pc.cacheLock.Unlock() return pending } @@ -136,7 +99,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) { pc.locals[addr] = true } -func (pc *cacheForMiner) IsLocal(addr common.Address) bool { +func (pc *cacheForMiner) isLocal(addr common.Address) bool { pc.addrLock.Lock() defer pc.addrLock.Unlock() return pc.locals[addr] diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 202f495dc..414d1b160 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -153,6 +153,8 @@ type BlockChain interface { // Config are the configuration parameters of the transaction pool. type Config struct { + EnableCache bool // enable pending cache for mining. Set as true only --mine option is enabled + Locals []common.Address // Addresses that should be treated by default as local NoLocals bool // Whether local transaction handling should be disabled Journal string // Journal of local transactions to survive node restarts @@ -236,6 +238,12 @@ func (config *Config) sanitize() Config { log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute) conf.ReannounceTime = time.Minute } + // log to inform user if the cache is enabled or not + if conf.EnableCache { + log.Info("legacytxpool Pending Cache is enabled") + } else { + log.Info("legacytxpool Pending Cache is disabled") + } return conf } @@ -270,7 +278,7 @@ type LegacyPool struct { all *lookup // All transactions to allow lookups priced *pricedList // All transactions sorted by price - pendingCache *cacheForMiner //pending list cache for miner + pendingCache pendingCache //pending list cache for miner reqResetCh chan *txpoolResetRequest reqPromoteCh chan *accountSet @@ -313,6 +321,9 @@ func New(config Config, chain BlockChain) *LegacyPool { initDoneCh: make(chan struct{}), pendingCache: newCacheForMiner(), } + if !config.EnableCache { + pool.pendingCache = newNoneCacheForMiner(pool) + } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { log.Info("Setting new local account", "address", addr) @@ -349,9 +360,6 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A // Set the basic pool parameters pool.gasTip.Store(uint256.NewInt(gasTip)) - // set dumper - pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee)) - // Initialize the state with head block, or fallback to empty one in // case the head state is not available (might occur when node is not // fully synced). @@ -386,27 +394,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A } pool.wg.Add(1) go pool.loop() - go pool.loopOfSync() return nil } -func (pool *LegacyPool) loopOfSync() { - ticker := time.NewTicker(400 * time.Millisecond) - for { - select { - case <-pool.reorgShutdownCh: - return - case <-ticker.C: - gasTip := pool.gasTip.Load() - currHead := pool.currentHead.Load() - if gasTip == nil || currHead == nil { - continue - } - pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee)) - } - } -} - // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. @@ -645,35 +635,56 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { - empty := txpool.PendingFilter{} - if filter == empty { - // return all pending transactions, no filtering - return pool.pendingCache.dump(false) - } + defer func(t0 time.Time) { + getPendingDurationTimer.Update(time.Since(t0)) + }(time.Now()) // If only blob transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if filter.OnlyBlobTxs { return nil } - defer func(t0 time.Time) { - getPendingDurationTimer.Update(time.Since(t0)) - }(time.Now()) - // It is a bit tricky here, we don't do the filtering here. - return pool.pendingCache.dump(true) -} -func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions { - return func(txs types.Transactions, addr common.Address) types.Transactions { - if !pool.pendingCache.IsLocal(addr) { + // Convert the new uint256.Int types to the old big.Int ones used by the legacy pool + var ( + minTipBig *big.Int + baseFeeBig *big.Int + ) + if filter.MinTip != nil { + minTipBig = filter.MinTip.ToBig() + } + if filter.BaseFee != nil { + baseFeeBig = filter.BaseFee.ToBig() + } + pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) + for addr, txs := range pool.pendingCache.dump() { + + // If the miner requests tip enforcement, cap the lists now + if minTipBig != nil && !pool.locals.contains(addr) { for i, tx := range txs { - if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 { + if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { txs = txs[:i] break } } } - return txs + if len(txs) > 0 { + lazies := make([]*txpool.LazyTransaction, len(txs)) + for i := 0; i < len(txs); i++ { + lazies[i] = &txpool.LazyTransaction{ + Pool: pool, + Hash: txs[i].Hash(), + Tx: txs[i], + Time: txs[i].Time(), + GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()), + GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()), + Gas: txs[i].Gas(), + BlobGas: txs[i].BlobGas(), + } + } + pending[addr] = lazies + } } + return pending } // Locals retrieves the accounts currently considered local by the pool. @@ -1469,10 +1480,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, pool.priced.SetBaseFee(pendingBaseFee) } } - gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee - go func() { - pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee)) - }() // Update all accounts to the latest known pending nonce nonces := make(map[common.Address]uint64, len(pool.pending)) for addr, list := range pool.pending { diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 6b900ed2c..b587f3676 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -47,6 +47,11 @@ var ( // sideeffects used during testing. testTxPoolConfig Config + // + testTxPoolConfigEnableCache Config + + enableCache bool + // eip1559Config is a chain config with EIP-1559 enabled at block 0. eip1559Config *params.ChainConfig ) @@ -55,6 +60,10 @@ func init() { testTxPoolConfig = DefaultConfig testTxPoolConfig.Journal = "" + testTxPoolConfigEnableCache = DefaultConfig + testTxPoolConfigEnableCache.Journal = "" + testTxPoolConfigEnableCache.EnableCache = true + cpy := *params.TestChainConfig eip1559Config = &cpy eip1559Config.BerlinBlock = common.Big0 @@ -163,7 +172,12 @@ func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.Privat blockchain := newTestBlockChain(config, 10000000, statedb, new(event.Feed)) key, _ := crypto.GenerateKey() - pool := New(testTxPoolConfig, blockchain) + var pool *LegacyPool + if enableCache { + pool = New(testTxPoolConfigEnableCache, blockchain) + } else { + pool = New(testTxPoolConfig, blockchain) + } if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()); err != nil { panic(err) } @@ -1534,12 +1548,22 @@ func TestMinGasPriceEnforced(t *testing.T) { } } +func TestRepricingDynamicFeeEnableCache(t *testing.T) { + enableCache = true + repricingDynamicFee(t) + enableCache = false +} + // Tests that setting the transaction pool gas price to a higher value correctly // discards everything cheaper (legacy & dynamic fee) than that and moves any // gapped transactions back from the pending pool to the queue. // // Note, local transactions are never allowed to be dropped. func TestRepricingDynamicFee(t *testing.T) { + repricingDynamicFee(t) +} + +func repricingDynamicFee(t *testing.T) { t.Parallel() // Create the pool to test the pricing enforcement with diff --git a/core/txpool/legacypool/nonecache_for_miner.go b/core/txpool/legacypool/nonecache_for_miner.go new file mode 100644 index 000000000..dd5ed2183 --- /dev/null +++ b/core/txpool/legacypool/nonecache_for_miner.go @@ -0,0 +1,52 @@ +package legacypool + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +var ( + _ pendingCache = (*noneCacheForMiner)(nil) +) + +type noneCacheForMiner struct { + pool *LegacyPool +} + +func newNoneCacheForMiner(pool *LegacyPool) *noneCacheForMiner { + return &noneCacheForMiner{pool: pool} +} + +func (nc *noneCacheForMiner) add(txs types.Transactions, signer types.Signer) { + // do nothing +} + +func (nc *noneCacheForMiner) del(txs types.Transactions, signer types.Signer) { + // do nothing +} + +func (nc *noneCacheForMiner) dump() map[common.Address]types.Transactions { + // dump all pending transactions from the pool + nc.pool.mu.RLock() + defer nc.pool.mu.RUnlock() + pending := make(map[common.Address]types.Transactions) + for addr, txlist := range nc.pool.pending { + pending[addr] = txlist.Flatten() + } + return pending +} + +func (nc *noneCacheForMiner) markLocal(addr common.Address) { + // do nothing +} + +func (nc *noneCacheForMiner) flattenLocals() []common.Address { + // return a copy of pool.locals + nc.pool.mu.RLock() + defer nc.pool.mu.RUnlock() + var locals []common.Address = make([]common.Address, 0, len(nc.pool.locals.accounts)) + for addr := range nc.pool.locals.accounts { + locals = append(locals, addr) + } + return locals +} diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index 5c137d407..1df0a7315 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -178,8 +178,6 @@ func (n *Backend) Close() error { // Commit seals a block and moves the chain forward to a new empty block. func (n *Backend) Commit() common.Hash { - // wait for the transactions to be sync into cache - time.Sleep(350 * time.Millisecond) return n.beacon.Commit() } diff --git a/ethclient/simulated/backend_test.go b/ethclient/simulated/backend_test.go index 9307e2105..a8fd7913c 100644 --- a/ethclient/simulated/backend_test.go +++ b/ethclient/simulated/backend_test.go @@ -214,7 +214,6 @@ func TestForkResendTx(t *testing.T) { t.Fatalf("could not create transaction: %v", err) } client.SendTransaction(ctx, tx) - time.Sleep(1 * time.Second) sim.Commit() // 3. diff --git a/miner/worker.go b/miner/worker.go index 5cf742bcd..3ea2bfc76 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1296,7 +1296,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs)) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs From 9c779d585676c64d047d9d81b8f7298aead624f9 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Thu, 9 Jan 2025 16:25:11 +0800 Subject: [PATCH 3/6] Txpool optimization: broadcast transaction body directly to peer (#243) Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com> Co-authored-by: andyzhang2023 --- eth/backend.go | 4 +++- eth/handler.go | 39 +++++++++++++++++++++++++++++++++++++-- internal/ethapi/api.go | 4 ++-- p2p/server.go | 3 +++ 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index ff5926a18..380498c52 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -21,12 +21,13 @@ import ( "context" "errors" "fmt" - "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "math/big" "runtime" "sync" "time" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -324,6 +325,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit if eth.handler, err = newHandler(&handlerConfig{ + DirectNodes: stack.Config().P2P.DirectNodes, Database: chainDb, Chain: eth.blockchain, TxPool: eth.txPool, diff --git a/eth/handler.go b/eth/handler.go index db8f0ed5c..bcb56d5b1 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/triedb/pathdb" ) @@ -88,6 +89,7 @@ type txPool interface { // handlerConfig is the collection of initialization parameters to create a full // node network handler. type handlerConfig struct { + DirectNodes []*enode.Node Database ethdb.Database // Database for direct sync insertions Chain *core.BlockChain // Blockchain to serve data from TxPool txPool // Transaction pool to propagate from @@ -137,6 +139,8 @@ type handler struct { handlerStartCh chan struct{} handlerDoneCh chan struct{} + + directNodes map[string]struct{} } // newHandler returns a handler for all Ethereum chain management protocol. @@ -159,7 +163,12 @@ func newHandler(config *handlerConfig) (*handler, error) { quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), handlerStartCh: make(chan struct{}), + directNodes: make(map[string]struct{}), + } + for _, node := range config.DirectNodes { + h.directNodes[node.ID().String()] = struct{}{} } + if config.Sync == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the snap // block is ahead, so snap sync was enabled for this node at a certain point. @@ -620,6 +629,29 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { } } +func (h *handler) peersForBroadcasting(numDirect int, peers []*ethPeer) (direct []*ethPeer, announce []*ethPeer) { + // Split the peers into direct-peers and announce-peers + // we send the tx directly to direct-peers + // we announce the tx to announce-peers + direct = make([]*ethPeer, 0, numDirect) + announce = make([]*ethPeer, 0, len(peers)-numDirect) + for _, peer := range peers { + if _, ok := h.directNodes[peer.ID()]; ok { + direct = append(direct, peer) + } else { + announce = append(announce, peer) + } + } + + // if directly-peers are not enough, move some announce-peers into directly pool + for len(direct) < numDirect && len(announce) > 0 { + // shift one peer to trusted + direct = append(direct, announce[0]) + announce = announce[1:] + } + return direct, announce +} + // BroadcastTransactions will propagate a batch of transactions // - To a square root of all peers for non-blob transactions // - And, separately, as announcements to all peers which are not known to @@ -642,6 +674,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { peers := h.peers.peersWithoutTransaction(tx.Hash()) var numDirect int + var direct, announce []*ethPeer = nil, peers switch { case tx.Type() == types.BlobTxType: blobTxs++ @@ -649,14 +682,16 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { largeTxs++ default: numDirect = int(math.Sqrt(float64(len(peers)))) + direct, announce = h.peersForBroadcasting(numDirect, peers) } + // Send the tx unconditionally to a subset of our peers - for _, peer := range peers[:numDirect] { + for _, peer := range direct { txset[peer] = append(txset[peer], tx.Hash()) log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash()) } // For the remaining peers, send announcement only - for _, peer := range peers[numDirect:] { + for _, peer := range announce { annos[peer] = append(annos[peer], tx.Hash()) log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash()) } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 02fb0fdb7..c3cecd051 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -2000,9 +2000,9 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c if tx.To() == nil { addr := crypto.CreateAddress(from, tx.Nonce()) - log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value()) + log.Debug("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value()) } else { - log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value()) + log.Debug("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value()) } return tx.Hash(), nil } diff --git a/p2p/server.go b/p2p/server.go index b398c51eb..af036a251 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -116,6 +116,9 @@ type Config struct { // maintained and re-connected on disconnects. StaticNodes []*enode.Node + // Direct nodes are static nodes who will always reveived transactions body rather than just the hashes. + DirectNodes []*enode.Node + // Trusted nodes are used as pre-configured connections which are always // allowed to connect, even above the peer limit. TrustedNodes []*enode.Node From cb544554d9216a69c64a2c66dd817af250033244 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Thu, 9 Jan 2025 16:25:54 +0800 Subject: [PATCH 4/6] Txpool optimization: filter out staled transactions of "nonce too low" when providing all pending list to miner (#244) Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com> Co-authored-by: andyzhang2023 --- core/txpool/legacypool/legacypool.go | 29 ++++++++++++++++++++++++++++ miner/worker_builder.go | 9 +++++---- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 2c4b5f651..d69f6fe9d 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -651,7 +651,15 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] var ( minTipBig *big.Int baseFeeBig *big.Int + + blockNumber uint64 = 0 + blockHash common.Hash = common.Hash{} + nonceTooLowCount = 0 + staled = make(map[common.Hash]struct{}) ) + defer func() { + log.Debug("perf-trace txpool-trace Pending() nonce too low", "blockNumber", blockNumber, "blockHash", blockHash, "nonceTooLowCount", nonceTooLowCount, "staled", len(staled)) + }() if filter.MinTip != nil { minTipBig = filter.MinTip.ToBig() } @@ -659,7 +667,28 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] baseFeeBig = filter.BaseFee.ToBig() } pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) + if currHeader := pool.chain.CurrentBlock(); currHeader != nil { + blockNumber = currHeader.Number.Uint64() + blockHash = currHeader.Hash() + currBlock := pool.chain.GetBlock(blockHash, currHeader.Number.Uint64()) + staled = make(map[common.Hash]struct{}, len(currBlock.Transactions())) + for _, tx := range currBlock.Transactions() { + staled[tx.Hash()] = struct{}{} + } + } for addr, txs := range pool.pendingCache.dump() { + // remove nonce too low transactions + if len(staled) > 0 { + noncetoolow := -1 + for i, tx := range txs { + if _, hit := staled[tx.Hash()]; !hit { + break + } + noncetoolow = i + } + nonceTooLowCount += noncetoolow + 1 + txs = txs[noncetoolow+1:] + } // If the miner requests tip enforcement, cap the lists now if minTipBig != nil && !pool.locals.contains(addr) { diff --git a/miner/worker_builder.go b/miner/worker_builder.go index 61a5672c2..2055f67cc 100644 --- a/miner/worker_builder.go +++ b/miner/worker_builder.go @@ -2,15 +2,16 @@ package miner import ( "errors" - mapset "github.com/deckarep/golang-set/v2" - "github.com/ethereum/go-ethereum/consensus/misc/eip4844" - "github.com/holiman/uint256" "math/big" "slices" "sync" "sync/atomic" "time" + mapset "github.com/deckarep/golang-set/v2" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -78,7 +79,7 @@ func (w *worker) fillTransactionsAndBundles(interrupt *atomic.Int32, env *enviro pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs)) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs From 3474c4724e929f2006cb591c890798970e38a32d Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Thu, 9 Jan 2025 16:26:37 +0800 Subject: [PATCH 5/6] Txpool opt async priced (#246) Co-authored-by: andyzhang2023 --- cmd/geth/main.go | 1 + cmd/utils/flags.go | 9 + core/txpool/legacypool/async_priced_list.go | 192 +++++++++++++ core/txpool/legacypool/legacypool.go | 31 +- core/txpool/legacypool/legacypool_test.go | 303 +++++++++++++++++++- core/txpool/legacypool/list.go | 38 ++- miner/worker.go | 2 +- 7 files changed, 553 insertions(+), 23 deletions(-) create mode 100644 core/txpool/legacypool/async_priced_list.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 784be6d7f..e45efc9e2 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -79,6 +79,7 @@ var ( utils.TxPoolRejournalFlag, utils.TxPoolPriceLimitFlag, utils.TxPoolPriceBumpFlag, + utils.TxPoolEnableAsyncPricedFlag, utils.TxPoolAccountSlotsFlag, utils.TxPoolGlobalSlotsFlag, utils.TxPoolAccountQueueFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ac46716bc..ef009ac3b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -410,6 +410,12 @@ var ( Value: ethconfig.Defaults.TxPool.PriceBump, Category: flags.TxPoolCategory, } + TxPoolEnableAsyncPricedFlag = &cli.BoolFlag{ + Name: "txpool.asyncpriced", + Usage: "enable async-priced-sorted list for txpool", + Value: false, + Category: flags.TxPoolCategory, + } TxPoolAccountSlotsFlag = &cli.Uint64Flag{ Name: "txpool.accountslots", Usage: "Minimum number of executable transaction slots guaranteed per account", @@ -1723,6 +1729,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) { if ctx.IsSet(TxPoolPriceBumpFlag.Name) { cfg.PriceBump = ctx.Uint64(TxPoolPriceBumpFlag.Name) } + if ctx.IsSet(TxPoolEnableAsyncPricedFlag.Name) { + cfg.EnableAsyncPriced = ctx.Bool(TxPoolEnableAsyncPricedFlag.Name) + } if ctx.IsSet(TxPoolAccountSlotsFlag.Name) { cfg.AccountSlots = ctx.Uint64(TxPoolAccountSlotsFlag.Name) } diff --git a/core/txpool/legacypool/async_priced_list.go b/core/txpool/legacypool/async_priced_list.go new file mode 100644 index 000000000..9ac5c3124 --- /dev/null +++ b/core/txpool/legacypool/async_priced_list.go @@ -0,0 +1,192 @@ +package legacypool + +import ( + "math/big" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/core/types" +) + +var _ pricedListInterface = &asyncPricedList{} + +type addEvent struct { + tx *types.Transaction + local bool +} + +type asyncPricedList struct { + priced *pricedList + floatingLowest atomic.Value + urgentLowest atomic.Value + baseFee atomic.Value + mu sync.Mutex + + // events + quit chan struct{} + reheap chan struct{} + add chan *addEvent + remove chan int + setBaseFee chan *big.Int +} + +func newAsyncPricedList(all *lookup) *asyncPricedList { + a := &asyncPricedList{ + priced: newPricedList(all), + quit: make(chan struct{}), + reheap: make(chan struct{}), + add: make(chan *addEvent), + remove: make(chan int), + setBaseFee: make(chan *big.Int), + } + go a.run() + return a +} + +// run is a loop that handles async operations: +// - reheap: reheap the whole priced list, to get the lowest gas price +// - put: add a transaction to the priced list +// - remove: remove transactions from the priced list +// - discard: remove transactions to make room for new ones +func (a *asyncPricedList) run() { + var reheap bool + var newOnes []*types.Transaction + var toRemove int = 0 + // current loop state + var currentDone chan struct{} = nil + var baseFee *big.Int = nil + for { + if currentDone == nil { + currentDone = make(chan struct{}) + go a.handle(reheap, newOnes, toRemove, baseFee, currentDone) + reheap, newOnes, toRemove, baseFee = false, nil, 0, nil + } + select { + case <-a.reheap: + reheap = true + + case add := <-a.add: + newOnes = append(newOnes, add.tx) + + case remove := <-a.remove: + toRemove += remove + + case baseFee = <-a.setBaseFee: + + case <-currentDone: + currentDone = nil + + case <-a.quit: + // Wait for current run to finish. + if currentDone != nil { + <-currentDone + } + return + } + } +} + +func (a *asyncPricedList) handle(reheap bool, newOnes []*types.Transaction, toRemove int, baseFee *big.Int, finished chan struct{}) { + defer close(finished) + a.mu.Lock() + defer a.mu.Unlock() + // add new transactions to the priced list + for _, tx := range newOnes { + a.priced.Put(tx, false) + } + // remove staled transactions from the priced list + a.priced.Removed(toRemove) + // reheap if needed + if reheap { + a.priced.Reheap() + // set the lowest priced transaction when reheap is done + var emptyTx *types.Transaction = nil + if len(a.priced.floating.list) > 0 { + a.floatingLowest.Store(a.priced.floating.list[0]) + } else { + a.floatingLowest.Store(emptyTx) + } + if len(a.priced.urgent.list) > 0 { + a.urgentLowest.Store(a.priced.urgent.list[0]) + } else { + a.urgentLowest.Store(emptyTx) + } + } + if baseFee != nil { + a.baseFee.Store(baseFee) + a.priced.SetBaseFee(baseFee) + } +} + +func (a *asyncPricedList) Staled() int { + // the Staled() of pricedList is thread-safe, so we don't need to lock here + return a.priced.Staled() +} + +func (a *asyncPricedList) Put(tx *types.Transaction, local bool) { + a.add <- &addEvent{tx, local} +} + +func (a *asyncPricedList) Removed(count int) { + a.remove <- count +} + +func (a *asyncPricedList) Underpriced(tx *types.Transaction) bool { + var urgentLowest, floatingLowest *types.Transaction = nil, nil + ul, fl := a.urgentLowest.Load(), a.floatingLowest.Load() + if ul != nil { + // be careful that ul might be nil + urgentLowest = ul.(*types.Transaction) + } + if fl != nil { + // be careful that fl might be nil + floatingLowest = fl.(*types.Transaction) + } + a.mu.Lock() + defer a.mu.Unlock() + return (urgentLowest == nil || a.priced.urgent.cmp(urgentLowest, tx) >= 0) && + (floatingLowest == nil || a.priced.floating.cmp(floatingLowest, tx) >= 0) && + (floatingLowest != nil || urgentLowest != nil) +} + +// Disacard cleans staled transactions to make room for new ones +func (a *asyncPricedList) Discard(slots int, force bool) (types.Transactions, bool) { + a.mu.Lock() + defer a.mu.Unlock() + return a.priced.Discard(slots, force) +} + +func (a *asyncPricedList) NeedReheap(currHead *types.Header) bool { + return false +} + +func (a *asyncPricedList) Reheap() { + a.reheap <- struct{}{} +} + +func (a *asyncPricedList) SetBaseFee(baseFee *big.Int) { + a.setBaseFee <- baseFee + a.reheap <- struct{}{} +} + +func (a *asyncPricedList) SetHead(currHead *types.Header) { + //do nothing +} + +func (a *asyncPricedList) GetBaseFee() *big.Int { + baseFee := a.baseFee.Load() + if baseFee == nil { + return big.NewInt(0) + } + return baseFee.(*big.Int) +} + +func (a *asyncPricedList) Close() { + close(a.quit) +} + +func (a *asyncPricedList) TxCount() int { + a.mu.Lock() + defer a.mu.Unlock() + return a.priced.TxCount() +} diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index d69f6fe9d..c82b2eeb7 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -153,7 +153,8 @@ type BlockChain interface { // Config are the configuration parameters of the transaction pool. type Config struct { - EnableCache bool // enable pending cache for mining. Set as true only --mine option is enabled + EnableAsyncPriced bool // enable async pricedlist. Set as true only --txpool.enableasyncpriced option is enabled + EnableCache bool // enable pending cache for mining. Set as true only --mine option is enabled Locals []common.Address // Addresses that should be treated by default as local NoLocals bool // Whether local transaction handling should be disabled @@ -238,6 +239,9 @@ func (config *Config) sanitize() Config { log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute) conf.ReannounceTime = time.Minute } + if config.EnableAsyncPriced { + log.Info("Enabling async pricedlist") + } // log to inform user if the cache is enabled or not if conf.EnableCache { log.Info("legacytxpool Pending Cache is enabled") @@ -276,7 +280,7 @@ type LegacyPool struct { queue map[common.Address]*list // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account all *lookup // All transactions to allow lookups - priced *pricedList // All transactions sorted by price + priced pricedListInterface // All transactions sorted by price pendingCounter int queueCounter int @@ -333,7 +337,11 @@ func New(config Config, chain BlockChain) *LegacyPool { pool.locals.add(addr) pool.pendingCache.markLocal(addr) } - pool.priced = newPricedList(pool.all) + if config.EnableAsyncPriced { + pool.priced = newAsyncPricedList(pool.all) + } else { + pool.priced = newPricedList(pool.all) + } if (!config.NoLocals || config.JournalRemote) && config.Journal != "" { pool.journal = newTxJournal(config.Journal) @@ -419,6 +427,7 @@ func (pool *LegacyPool) loop() { defer evict.Stop() defer journal.Stop() defer reannounce.Stop() + defer pool.priced.Close() // Notify tests that the init phase is done close(pool.initDoneCh) @@ -433,7 +442,7 @@ func (pool *LegacyPool) loop() { pool.mu.RLock() pending, queued := pool.stats() pool.mu.RUnlock() - stales := int(pool.priced.stales.Load()) + stales := pool.priced.Staled() if pending != prevPending || queued != prevQueued || stales != prevStales { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) @@ -882,16 +891,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e } // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { - currHead := pool.currentHead.Load() - if currHead != nil && currHead.BaseFee != nil && pool.priced.NeedReheap(currHead) { - if pool.chainconfig.IsLondon(new(big.Int).Add(currHead.Number, big.NewInt(1))) { - baseFee := eip1559.CalcBaseFee(pool.chainconfig, currHead, currHead.Time+1) - pool.priced.SetBaseFee(baseFee) - } - pool.priced.Reheap() - pool.priced.currHead = currHead - } - // If the new transaction is underpriced, don't accept it if !isLocal && pool.priced.Underpriced(tx) { log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) @@ -1509,11 +1508,13 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, if reset != nil { pool.demoteUnexecutables(demoteAddrs) demoteTimer.UpdateSince(t0) - var pendingBaseFee = pool.priced.urgent.baseFee + var pendingBaseFee = pool.priced.GetBaseFee() if reset.newHead != nil { if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1) pool.priced.SetBaseFee(pendingBaseFee) + } else { + pool.priced.Reheap() } } // Update all accounts to the latest known pending nonce diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index b587f3676..8e37e3908 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -197,7 +197,11 @@ func validatePoolInternals(pool *LegacyPool) error { return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued) } pool.priced.Reheap() - priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.RemoteCount() + if pool.config.EnableAsyncPriced { + // sleep a bit to wait for the reheap to finish + time.Sleep(50 * time.Millisecond) + } + priced, remote := pool.priced.TxCount(), pool.all.RemoteCount() if priced != remote { return fmt.Errorf("total priced transaction count %d != %d", priced, remote) } @@ -1873,6 +1877,122 @@ func TestUnderpricing(t *testing.T) { } } +func TestAsyncUnderpricing(t *testing.T) { + t.Parallel() + + // Create the pool to test the pricing enforcement with + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) + + config := testTxPoolConfig + config.GlobalSlots = 2 + config.GlobalQueue = 2 + config.EnableAsyncPriced = true + + pool := New(config, blockchain) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + defer pool.Close() + + // Keep track of transaction events to ensure all executables get announced + events := make(chan core.NewTxsEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + // Generate and queue a batch of transactions, both pending and queued + txs := types.Transactions{} + + txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0])) + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) + + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[1])) + + ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[2]) + + // Import the batch and that both pending and queued transactions match up + pool.addRemotes(txs) + pool.addLocal(ltx) + + // sleep a bit to wait for priced transactions to be processed in parallel + time.Sleep(50 * time.Millisecond) + + pending, queued := pool.Stats() + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + if err := validateEvents(events, 3); err != nil { + t.Fatalf("original event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Ensure that adding an underpriced transaction on block limit fails + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keys[1])); !errors.Is(err, txpool.ErrUnderpriced) { + t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced) + } + // Replace a future transaction with a future transaction + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(2), keys[1])); err != nil { // +K1:1 => -K1:1 => Pend K0:0, K0:1, K2:0; Que K1:1 + t.Fatalf("failed to add well priced transaction: %v", err) + } + // Ensure that adding high priced transactions drops cheap ones, but not own + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { // +K1:0 => -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que - + t.Fatalf("failed to add well priced transaction: %v", err) + } + if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2 + t.Fatalf("failed to add well priced transaction: %v", err) + } + if err := pool.addRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3 + t.Fatalf("failed to add well priced transaction: %v", err) + } + // Ensure that replacing a pending transaction with a future transaction fails + if err := pool.addRemote(pricedTransaction(5, 100000, big.NewInt(6), keys[1])); err != txpool.ErrFutureReplacePending { + t.Fatalf("adding future replace transaction error mismatch: have %v, want %v", err, txpool.ErrFutureReplacePending) + } + pending, queued = pool.Stats() + if pending != 2 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + } + if queued != 2 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("additional event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Ensure that adding local transactions can push out even higher priced ones + ltx = pricedTransaction(1, 100000, big.NewInt(0), keys[2]) + if err := pool.addLocal(ltx); err != nil { + t.Fatalf("failed to append underpriced local transaction: %v", err) + } + ltx = pricedTransaction(0, 100000, big.NewInt(0), keys[3]) + if err := pool.addLocal(ltx); err != nil { + t.Fatalf("failed to add new underpriced local transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("local event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Tests that more expensive transactions push out cheap ones from the pool, but // without producing instability by creating gaps that start jumping transactions // back and forth between queued/pending. @@ -1941,6 +2061,186 @@ func TestStableUnderpricing(t *testing.T) { } } +// Tests that more expensive transactions push out cheap ones from the pool, but +// without producing instability by creating gaps that start jumping transactions +// back and forth between queued/pending. +func TestAsyncStableUnderpricing(t *testing.T) { + t.Parallel() + + // Create the pool to test the pricing enforcement with + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) + + config := testTxPoolConfig + config.GlobalSlots = 128 + config.GlobalQueue = 0 + config.EnableAsyncPriced = true + + pool := New(config, blockchain) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + defer pool.Close() + + // Keep track of transaction events to ensure all executables get announced + events := make(chan core.NewTxsEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 2) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + // Fill up the entire queue with the same transaction price points + txs := types.Transactions{} + for i := uint64(0); i < config.GlobalSlots; i++ { + txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0])) + } + pool.addRemotesSync(txs) + + pending, queued := pool.Stats() + if pending != int(config.GlobalSlots) { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots) + } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if err := validateEvents(events, int(config.GlobalSlots)); err != nil { + t.Fatalf("original event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { + t.Fatalf("failed to add well priced transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != int(config.GlobalSlots) { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots) + } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if err := validateEvents(events, 1); err != nil { + t.Fatalf("additional event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + +// Tests that when the pool reaches its global transaction limit, underpriced +// transactions (legacy & dynamic fee) are gradually shifted out for more +// expensive ones and any gapped pending transactions are moved into the queue. +// +// Note, local transactions are never allowed to be dropped. +func TestAsyncUnderpricingDynamicFee(t *testing.T) { + t.Parallel() + + pool, _ := setupPoolWithConfig(eip1559Config) + defer pool.Close() + + pool.config.GlobalSlots = 2 + pool.config.GlobalQueue = 2 + pool.config.EnableAsyncPriced = true + + // Keep track of transaction events to ensure all executables get announced + events := make(chan core.NewTxsEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 4) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + + // Generate and queue a batch of transactions, both pending and queued + txs := types.Transactions{} + + txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[0])) + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) + txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(2), big.NewInt(1), keys[1])) + + ltx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[2]) + + // Import the batch and that both pending and queued transactions match up + pool.addRemotes(txs) // Pend K0:0, K0:1; Que K1:1 + pool.addLocal(ltx) // +K2:0 => Pend K0:0, K0:1, K2:0; Que K1:1 + + pending, queued := pool.Stats() + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + if err := validateEvents(events, 3); err != nil { + t.Fatalf("original event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + + // Ensure that adding an underpriced transaction fails + tx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[1]) + if err := pool.addRemote(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1 + t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced) + } + + // Ensure that adding high priced transactions drops cheap ones, but not own + tx = pricedTransaction(0, 100000, big.NewInt(2), keys[1]) + if err := pool.addRemote(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que - + t.Fatalf("failed to add well priced transaction: %v", err) + } + + tx = pricedTransaction(1, 100000, big.NewInt(3), keys[1]) + if err := pool.addRemoteSync(tx); err != nil { // +K1:2, -K0:1 => Pend K0:0 K1:0, K2:0; Que K1:2 + t.Fatalf("failed to add well priced transaction: %v", err) + } + tx = dynamicFeeTx(2, 100000, big.NewInt(4), big.NewInt(1), keys[1]) + if err := pool.addRemoteSync(tx); err != nil { // +K1:3, -K1:0 => Pend K0:0 K2:0; Que K1:2 K1:3 + t.Fatalf("failed to add well priced transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != 2 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + } + if queued != 2 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("additional event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Ensure that adding local transactions can push out even higher priced ones + ltx = dynamicFeeTx(1, 100000, big.NewInt(0), big.NewInt(0), keys[2]) + if err := pool.addLocal(ltx); err != nil { + t.Fatalf("failed to append underpriced local transaction: %v", err) + } + ltx = dynamicFeeTx(0, 100000, big.NewInt(0), big.NewInt(0), keys[3]) + if err := pool.addLocal(ltx); err != nil { + t.Fatalf("failed to add new underpriced local transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("local event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Tests that when the pool reaches its global transaction limit, underpriced // transactions (legacy & dynamic fee) are gradually shifted out for more // expensive ones and any gapped pending transactions are moved into the queue. @@ -2097,7 +2397,6 @@ func TestDualHeapEviction(t *testing.T) { add(false) for baseFee = 0; baseFee <= 1000; baseFee += 100 { pool.priced.SetBaseFee(big.NewInt(int64(baseFee))) - pool.priced.Reheap() add(true) check(highCap, "fee cap") add(false) diff --git a/core/txpool/legacypool/list.go b/core/txpool/legacypool/list.go index 6b823a4a7..181fadb47 100644 --- a/core/txpool/legacypool/list.go +++ b/core/txpool/legacypool/list.go @@ -537,6 +537,21 @@ func (h *priceHeap) Pop() interface{} { return x } +var _ pricedListInterface = (*pricedList)(nil) + +type pricedListInterface interface { + Put(tx *types.Transaction, local bool) + Removed(count int) + Underpriced(tx *types.Transaction) bool + Discard(slots int, force bool) (types.Transactions, bool) + Reheap() + SetBaseFee(baseFee *big.Int) + GetBaseFee() *big.Int + Staled() int + TxCount() int + Close() +} + // pricedList is a price-sorted heap to allow operating on transactions pool // contents in a price-incrementing way. It's built upon the all transactions // in txpool but only interested in the remote part. It means only remote transactions @@ -549,7 +564,6 @@ func (h *priceHeap) Pop() interface{} { // better candidates for inclusion while in other cases (at the top of the baseFee peak) // the floating heap is better. When baseFee is decreasing they behave similarly. type pricedList struct { - currHead *types.Header // Current block header for effective tip calculation // Number of stale price points to (re-heap trigger). stales atomic.Int64 @@ -571,6 +585,10 @@ func newPricedList(all *lookup) *pricedList { } } +func (l *pricedList) Staled() int { + return int(l.stales.Load()) +} + // Put inserts a new transaction into the heap. func (l *pricedList) Put(tx *types.Transaction, local bool) { if local { @@ -668,10 +686,6 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { return drop, true } -func (l *pricedList) NeedReheap(currHead *types.Header) bool { - return l.currHead == nil || currHead == nil || currHead.Hash().Cmp(l.currHead.Hash()) != 0 -} - // Reheap forcibly rebuilds the heap based on the current remote transaction set. func (l *pricedList) Reheap() { l.reheapMu.Lock() @@ -703,4 +717,18 @@ func (l *pricedList) Reheap() { // necessary to call right before SetBaseFee when processing a new block. func (l *pricedList) SetBaseFee(baseFee *big.Int) { l.urgent.baseFee = baseFee + l.Reheap() +} + +// GetBaseFee returns the current base fee used for sorting the urgent heap. +func (l *pricedList) GetBaseFee() *big.Int { + return l.urgent.baseFee +} + +func (l *pricedList) TxCount() int { + return len(l.urgent.list) + len(l.floating.list) +} + +func (l *pricedList) Close() { + //do nothing } diff --git a/miner/worker.go b/miner/worker.go index 3ea2bfc76..5cf742bcd 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1296,7 +1296,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs)) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs From 1d7cb0dab85a0af88cafcb34a76cd8cee2eded65 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Fri, 10 Jan 2025 10:43:58 +0800 Subject: [PATCH 6/6] chore: txpool optimization metrics (#247) Co-authored-by: andyzhang2023 --- core/txpool/legacypool/legacypool.go | 103 +++++++++++++++++++-------- eth/fetcher/tx_fetcher.go | 8 +-- eth/handler.go | 10 +-- eth/protocols/eth/broadcast.go | 6 +- eth/protocols/eth/peer.go | 2 +- 5 files changed, 83 insertions(+), 46 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index c82b2eeb7..740ffffd1 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -115,6 +115,8 @@ var ( // demote metrics // demoteDuration measures how long time a demotion takes. demoteTxMeter = metrics.NewRegisteredMeter("txpool/demote/tx", nil) + promoteTxMeter = metrics.NewRegisteredMeter("txpool/promote/tx", nil) + sendFeedTxMeter = metrics.NewRegisteredMeter("txpool/sendfeed/tx", nil) resetDepthMeter = metrics.NewRegisteredMeter("txpool/reset/depth", nil) //reorg depth of blocks which causes demote // mutex latency metrics @@ -123,16 +125,27 @@ var ( journalMutexTimer = metrics.NewRegisteredTimer("txpool/mutex/journal/duration", nil) // latency of add() method - addTimer = metrics.NewRegisteredTimer("txpool/addtime", nil) - addWithLockTimer = metrics.NewRegisteredTimer("txpool/locked/addtime", nil) + addTimer = metrics.NewRegisteredTimer("txpool/addtime", nil) + addWithLockTimer = metrics.NewRegisteredTimer("txpool/locked/addtime", nil) + addWaitLockTimer = metrics.NewRegisteredTimer("txpool/locked/waittime", nil) + validateBasicTimer = metrics.NewRegisteredTimer("txpool/validate/basic", nil) + + // reset detail metrics + resetCount = metrics.NewRegisteredCounter("txpool/reset/count", nil) + resetTimer = metrics.NewRegisteredTimer("txpool/reset/time", nil) + resetWaitLockTimer = metrics.NewRegisteredTimer("txpool/reset/wait/time", nil) + resetResetTimer = metrics.NewRegisteredTimer("txpool/reset/reset/time", nil) + resetPromoteTimer = metrics.NewRegisteredTimer("txpool/reset/promote/time", nil) + resetDemoteTimer = metrics.NewRegisteredTimer("txpool/reset/demote/time", nil) + resetReheapTimer = metrics.NewRegisteredTimer("txpool/reset/reheap/time", nil) + resetFeedTimer = metrics.NewRegisteredTimer("txpool/reset/feed/time", nil) // reorg detail metrics - resetTimer = metrics.NewRegisteredTimer("txpool/resettime", nil) - promoteTimer = metrics.NewRegisteredTimer("txpool/promotetime", nil) - demoteTimer = metrics.NewRegisteredTimer("txpool/demotetime", nil) - reorgresetTimer = metrics.NewRegisteredTimer("txpool/reorgresettime", nil) - truncateTimer = metrics.NewRegisteredTimer("txpool/truncatetime", nil) - reorgresetNoblockingTimer = metrics.NewRegisteredTimer("txpool/noblocking/reorgresettime", nil) + reorgCount = metrics.NewRegisteredCounter("txpool/reorg/count", nil) + reorgTimer = metrics.NewRegisteredTimer("txpool/reorg/time", nil) + reorgWaitLockTimer = metrics.NewRegisteredTimer("txpool/reorg/wait/time", nil) + reorgPromoteTimer = metrics.NewRegisteredTimer("txpool/reorg/promote/time", nil) + reorgFeedTimer = metrics.NewRegisteredTimer("txpool/reorg/feed/time", nil) ) // BlockChain defines the minimal set of methods needed to back a tx pool with @@ -1162,11 +1175,12 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { // If sync is set, the method will block until all internal maintenance related // to the add is finished. Only use this during tests for determinism! func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error { + start := time.Now() defer func(t0 time.Time) { if len(txs) > 0 { addTimer.Update(time.Since(t0) / time.Duration(len(txs))) } - }(time.Now()) + }(start) // Do not treat as local if local transactions have been disabled local = local && !pool.config.NoLocals @@ -1199,7 +1213,10 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error } // Process all the new transaction and merge any errors into the original slice + validateBasicTimer.UpdateSince(start) + tw := time.Now() pool.mu.Lock() + addWaitLockTimer.UpdateSince(tw) t0 := time.Now() newErrs, dirtyAddrs := pool.addTxsLocked(news, local) if len(news) > 0 { @@ -1458,15 +1475,41 @@ func (pool *LegacyPool) scheduleReorgLoop() { // runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) { + var reorgCost, waittime, resetDur, demoteDur, reheapDur, promoteDur, sendfeedDur time.Duration + var t0 time.Time + var promoted []*types.Transaction + var demoted, sendFeed int + var start = time.Now() defer func(t0 time.Time) { - reorgDurationTimer.Update(time.Since(t0)) + reorgCost = time.Since(t0) + demoteTxMeter.Mark(int64(demoted)) + promoteTxMeter.Mark(int64(len(promoted))) + sendFeedTxMeter.Mark(int64(sendFeed)) + if reset != nil { - reorgresetTimer.UpdateSince(t0) - if reset.newHead != nil { - log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64()) + resetCount.Inc(1) + resetTimer.Update(reorgCost) + resetWaitLockTimer.Update(waittime) + resetResetTimer.Update(resetDur) + resetDemoteTimer.Update(demoteDur) + resetReheapTimer.Update(reheapDur) + resetPromoteTimer.Update(promoteDur) + resetFeedTimer.Update(sendfeedDur) + + pending, queued := pool.stats() + if reset.newHead != nil && reset.oldHead != nil { + log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64(), + "reorgCost", reorgCost, "waittime", waittime, "promoted", len(promoted), "demoted", demoted, "sendFeed", sendFeed, "pending", pending, "queued", queued, + "resetDur", resetDur, "demoteDur", demoteDur, "reheapDur", reheapDur, "promoteDur", promoteDur, "sendfeedDur", sendfeedDur) } + } else { + reorgCount.Inc(1) + reorgTimer.Update(reorgCost) + reorgWaitLockTimer.Update(waittime) + reorgPromoteTimer.Update(promoteDur) + reorgFeedTimer.Update(sendfeedDur) } - }(time.Now()) + }(start) defer close(done) var promoteAddrs, demoteAddrs []common.Address @@ -1476,12 +1519,12 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // the flatten operation can be avoided. promoteAddrs = dirtyAccounts.flatten() } + tw := time.Now() pool.mu.Lock() - tl, t0 := time.Now(), time.Now() + waittime, t0 = time.Since(tw), time.Now() if reset != nil { // Reset from the old head to the new, rescheduling any reorged transactions demoteAddrs = pool.reset(reset.oldHead, reset.newHead) - resetTimer.UpdateSince(t0) // Nonces were reset, discard any events that became stale for addr := range events { @@ -1496,18 +1539,18 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, promoteAddrs = append(promoteAddrs, addr) } } + resetDur = time.Since(t0) // Check for pending transactions for every account that sent new ones t0 = time.Now() - promoted := pool.promoteExecutables(promoteAddrs) - promoteTimer.UpdateSince(t0) + promoted = pool.promoteExecutables(promoteAddrs) + promoteDur, t0 = time.Since(t0), time.Now() // If a new block appeared, validate the pool of pending transactions. This will // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). - t0 = time.Now() if reset != nil { - pool.demoteUnexecutables(demoteAddrs) - demoteTimer.UpdateSince(t0) + demoted = pool.demoteUnexecutables(demoteAddrs) + demoteDur, t0 = time.Since(t0), time.Now() var pendingBaseFee = pool.priced.GetBaseFee() if reset.newHead != nil { if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { @@ -1524,17 +1567,16 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, nonces[addr] = highestPending.Nonce() + 1 } pool.pendingNonces.setAll(nonces) + reheapDur, t0 = time.Since(t0), time.Now() } // Ensure pool.queue and pool.pending sizes stay within the configured limits. - t0 = time.Now() pool.truncatePending() pool.truncateQueue() - truncateTimer.UpdateSince(t0) dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg)) pool.changesSinceReorg = 0 // Reset change counter - reorgresetNoblockingTimer.UpdateSince(tl) pool.mu.Unlock() + t0 = time.Now() // Notify subsystems for newly added transactions for _, tx := range promoted { @@ -1550,7 +1592,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, txs = append(txs, set.Flatten()...) } pool.txFeed.Send(core.NewTxsEvent{Txs: txs}) + sendFeed = len(txs) } + sendfeedDur = time.Since(t0) } // reset retrieves the current state of the blockchain and ensures the content @@ -1666,14 +1710,13 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []com } } resetDepthMeter.Mark(int64(depth)) - log.Info("reset block depth", "depth", depth) // Initialize the internal state to the current head if newHead == nil { newHead = pool.chain.CurrentBlock() // Special case during testing } statedb, err := pool.chain.StateAt(newHead.Root) if err != nil { - log.Error("Failed to reset txpool state", "err", err) + log.Error("Failed to reset txpool state", "err", err, "depth", depth) return } pool.currentHead.Store(newHead) @@ -1687,7 +1730,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []com } // Inject any transactions discarded due to reorgs - log.Debug("Reinjecting stale transactions", "count", len(reinject)) + log.Debug("reset state of txpool", "reinject", len(reinject), "depth", depth) core.SenderCacher.Recover(pool.signer, reinject) pool.addTxsLocked(reinject, false) return @@ -1925,14 +1968,14 @@ func (pool *LegacyPool) truncateQueue() { // Note: transactions are not marked as removed in the priced list because re-heaping // is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful // to trigger a re-heap is this function -func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { +func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) int { + var demoteTxNum = 0 if demoteAddrs == nil { demoteAddrs = make([]common.Address, 0, len(pool.pending)) for addr := range pool.pending { demoteAddrs = append(demoteAddrs, addr) } } - demoteTxMeter.Mark(int64(len(demoteAddrs))) var removed = 0 // Iterate over all accounts and demote any non-executable transactions @@ -2001,8 +2044,10 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { } pool.pendingCache.del(dropPendingCache, pool.signer) removed += len(dropPendingCache) + demoteTxNum += len(dropPendingCache) } pool.priced.Removed(removed) + return demoteTxNum } // addressByHeartbeat is an account address tagged with its last activity timestamp. diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index e24cbaf25..cd7ed32c2 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -257,7 +257,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c duplicate++ case f.isKnownUnderpriced(hash): underpriced++ - log.Info("announced transaction is underpriced", "hash", hash.String()) + log.Trace("announced transaction is underpriced", "hash", hash.String()) default: unknownHashes = append(unknownHashes, hash) if types == nil { @@ -431,13 +431,13 @@ func (f *TxFetcher) loop() { // check. Should be fine as the limit is in the thousands and the // request size in the hundreds. txAnnounceDOSMeter.Mark(int64(len(ann.hashes))) - log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes), "num", len(ann.hashes)) + log.Debug("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes), "num", len(ann.hashes)) break } want := used + len(ann.hashes) if want > maxTxAnnounces { txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces)) - log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes[want-maxTxAnnounces:]), "num", len(ann.hashes)) + log.Debug("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes[want-maxTxAnnounces:]), "num", len(ann.hashes)) ann.hashes = ann.hashes[:want-maxTxAnnounces] ann.metas = ann.metas[:want-maxTxAnnounces] } @@ -556,7 +556,7 @@ func (f *TxFetcher) loop() { for peer, req := range f.requests { if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout { txRequestTimeoutMeter.Mark(int64(len(req.hashes))) - log.Info("announced transaction request timeout", "hashes", joinHashes(req.hashes), "num", len(req.hashes)) + log.Debug("announced transaction request timeout", "hashes", joinHashes(req.hashes), "num", len(req.hashes)) // Reschedule all the not-yet-delivered fetches to alternate peers for _, hash := range req.hashes { diff --git a/eth/handler.go b/eth/handler.go index bcb56d5b1..b138b66bb 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -688,31 +688,23 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { // Send the tx unconditionally to a subset of our peers for _, peer := range direct { txset[peer] = append(txset[peer], tx.Hash()) - log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash()) } // For the remaining peers, send announcement only for _, peer := range announce { annos[peer] = append(annos[peer], tx.Hash()) - log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash()) } } for peer, hashes := range txset { directPeers++ directCount += len(hashes) peer.AsyncSendTransactions(hashes) - log.Trace("Transaction broadcast bodies", "txs", len(hashes), - "peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(), - ) } for peer, hashes := range annos { annPeers++ annCount += len(hashes) peer.AsyncSendPooledTransactionHashes(hashes) - log.Trace("Transaction broadcast hashes", "txs", len(hashes), - "peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(), - ) } - log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs, + log.Trace("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs, "bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annPeers, "anncount", annCount) } diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 89c848a42..1fb4d92d8 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -115,7 +115,7 @@ func (p *Peer) broadcastTransactions() { return } close(done) - p.Log().Trace("Sent transaction bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String(), "hashes", concat(collectHashes(txs))) + p.Log().Trace("Sent transaction bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String()) }() } } @@ -129,7 +129,7 @@ func (p *Peer) broadcastTransactions() { // New batch of transactions to be broadcast, queue them (with cap) queue = append(queue, hashes...) if len(queue) > maxQueuedTxs { - p.Log().Warn("Broadcast hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxs, "hashes", concat(queue[:len(queue)-maxQueuedTxs])) + p.Log().Warn("Broadcast hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxs) txBroadcastAbandonMeter.Mark(int64(len(queue) - maxQueuedTxs)) // Fancy copy and resize to ensure buffer doesn't grow indefinitely queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])] @@ -192,7 +192,7 @@ func (p *Peer) announceTransactions() { return } close(done) - p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String(), "hashes", concat(pending)) + p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String()) }() } } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index fc7c2a18e..c597e247f 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -450,7 +450,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ // RequestTxs fetches a batch of transactions from a remote node. func (p *Peer) RequestTxs(hashes []common.Hash) error { - p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) + p.Log().Trace("Fetching batch of transactions", "count", len(hashes)) id := rand.Uint64() requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)