Skip to content

Commit

Permalink
Merge branch 'develop' into v0.5.5-changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-reorg authored Jan 10, 2025
2 parents 53185b5 + 1d7cb0d commit c669a91
Show file tree
Hide file tree
Showing 20 changed files with 871 additions and 183 deletions.
3 changes: 0 additions & 3 deletions accounts/abi/bind/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func TestWaitDeployed(t *testing.T) {

// Send and mine the transaction.
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()

select {
Expand Down Expand Up @@ -118,7 +117,6 @@ func TestWaitDeployedCornerCases(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()
notContractCreation := errors.New("tx is not contract creation")
if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() {
Expand All @@ -137,6 +135,5 @@ func TestWaitDeployedCornerCases(t *testing.T) {
}()

backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
cancel()
}
4 changes: 4 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
cfg.Eth.OverrideVerkle = &v
}

if ctx.Bool(utils.MiningEnabledFlag.Name) {
cfg.Eth.TxPool.EnableCache = true
}

backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// Create gauge with geth system and build information
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
utils.TxPoolRejournalFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
utils.TxPoolEnableAsyncPricedFlag,
utils.TxPoolAccountSlotsFlag,
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ var (
Value: ethconfig.Defaults.TxPool.PriceBump,
Category: flags.TxPoolCategory,
}
TxPoolEnableAsyncPricedFlag = &cli.BoolFlag{
Name: "txpool.asyncpriced",
Usage: "enable async-priced-sorted list for txpool",
Value: false,
Category: flags.TxPoolCategory,
}
TxPoolAccountSlotsFlag = &cli.Uint64Flag{
Name: "txpool.accountslots",
Usage: "Minimum number of executable transaction slots guaranteed per account",
Expand Down Expand Up @@ -1723,6 +1729,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolPriceBumpFlag.Name) {
cfg.PriceBump = ctx.Uint64(TxPoolPriceBumpFlag.Name)
}
if ctx.IsSet(TxPoolEnableAsyncPricedFlag.Name) {
cfg.EnableAsyncPriced = ctx.Bool(TxPoolEnableAsyncPricedFlag.Name)
}
if ctx.IsSet(TxPoolAccountSlotsFlag.Name) {
cfg.AccountSlots = ctx.Uint64(TxPoolAccountSlotsFlag.Name)
}
Expand Down
192 changes: 192 additions & 0 deletions core/txpool/legacypool/async_priced_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package legacypool

import (
"math/big"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/core/types"
)

var _ pricedListInterface = &asyncPricedList{}

type addEvent struct {
tx *types.Transaction
local bool
}

type asyncPricedList struct {
priced *pricedList
floatingLowest atomic.Value
urgentLowest atomic.Value
baseFee atomic.Value
mu sync.Mutex

// events
quit chan struct{}
reheap chan struct{}
add chan *addEvent
remove chan int
setBaseFee chan *big.Int
}

func newAsyncPricedList(all *lookup) *asyncPricedList {
a := &asyncPricedList{
priced: newPricedList(all),
quit: make(chan struct{}),
reheap: make(chan struct{}),
add: make(chan *addEvent),
remove: make(chan int),
setBaseFee: make(chan *big.Int),
}
go a.run()
return a
}

// run is a loop that handles async operations:
// - reheap: reheap the whole priced list, to get the lowest gas price
// - put: add a transaction to the priced list
// - remove: remove transactions from the priced list
// - discard: remove transactions to make room for new ones
func (a *asyncPricedList) run() {
var reheap bool
var newOnes []*types.Transaction
var toRemove int = 0
// current loop state
var currentDone chan struct{} = nil
var baseFee *big.Int = nil
for {
if currentDone == nil {
currentDone = make(chan struct{})
go a.handle(reheap, newOnes, toRemove, baseFee, currentDone)
reheap, newOnes, toRemove, baseFee = false, nil, 0, nil
}
select {
case <-a.reheap:
reheap = true

case add := <-a.add:
newOnes = append(newOnes, add.tx)

case remove := <-a.remove:
toRemove += remove

case baseFee = <-a.setBaseFee:

case <-currentDone:
currentDone = nil

case <-a.quit:
// Wait for current run to finish.
if currentDone != nil {
<-currentDone
}
return
}
}
}

func (a *asyncPricedList) handle(reheap bool, newOnes []*types.Transaction, toRemove int, baseFee *big.Int, finished chan struct{}) {
defer close(finished)
a.mu.Lock()
defer a.mu.Unlock()
// add new transactions to the priced list
for _, tx := range newOnes {
a.priced.Put(tx, false)
}
// remove staled transactions from the priced list
a.priced.Removed(toRemove)
// reheap if needed
if reheap {
a.priced.Reheap()
// set the lowest priced transaction when reheap is done
var emptyTx *types.Transaction = nil
if len(a.priced.floating.list) > 0 {
a.floatingLowest.Store(a.priced.floating.list[0])
} else {
a.floatingLowest.Store(emptyTx)
}
if len(a.priced.urgent.list) > 0 {
a.urgentLowest.Store(a.priced.urgent.list[0])
} else {
a.urgentLowest.Store(emptyTx)
}
}
if baseFee != nil {
a.baseFee.Store(baseFee)
a.priced.SetBaseFee(baseFee)
}
}

func (a *asyncPricedList) Staled() int {
// the Staled() of pricedList is thread-safe, so we don't need to lock here
return a.priced.Staled()
}

func (a *asyncPricedList) Put(tx *types.Transaction, local bool) {
a.add <- &addEvent{tx, local}
}

func (a *asyncPricedList) Removed(count int) {
a.remove <- count
}

func (a *asyncPricedList) Underpriced(tx *types.Transaction) bool {
var urgentLowest, floatingLowest *types.Transaction = nil, nil
ul, fl := a.urgentLowest.Load(), a.floatingLowest.Load()
if ul != nil {
// be careful that ul might be nil
urgentLowest = ul.(*types.Transaction)
}
if fl != nil {
// be careful that fl might be nil
floatingLowest = fl.(*types.Transaction)
}
a.mu.Lock()
defer a.mu.Unlock()
return (urgentLowest == nil || a.priced.urgent.cmp(urgentLowest, tx) >= 0) &&
(floatingLowest == nil || a.priced.floating.cmp(floatingLowest, tx) >= 0) &&
(floatingLowest != nil || urgentLowest != nil)
}

// Disacard cleans staled transactions to make room for new ones
func (a *asyncPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
a.mu.Lock()
defer a.mu.Unlock()
return a.priced.Discard(slots, force)
}

func (a *asyncPricedList) NeedReheap(currHead *types.Header) bool {
return false
}

func (a *asyncPricedList) Reheap() {
a.reheap <- struct{}{}
}

func (a *asyncPricedList) SetBaseFee(baseFee *big.Int) {
a.setBaseFee <- baseFee
a.reheap <- struct{}{}
}

func (a *asyncPricedList) SetHead(currHead *types.Header) {
//do nothing
}

func (a *asyncPricedList) GetBaseFee() *big.Int {
baseFee := a.baseFee.Load()
if baseFee == nil {
return big.NewInt(0)
}
return baseFee.(*big.Int)
}

func (a *asyncPricedList) Close() {
close(a.quit)
}

func (a *asyncPricedList) TxCount() int {
a.mu.Lock()
defer a.mu.Unlock()
return a.priced.TxCount()
}
63 changes: 13 additions & 50 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,35 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

var (
pendingCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/pending/cache", nil)
localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil)
)

type pendingCache interface {
add(types.Transactions, types.Signer)
del(types.Transactions, types.Signer)
dump() map[common.Address]types.Transactions
markLocal(common.Address)
flattenLocals() []common.Address
}

// copy of pending transactions
type cacheForMiner struct {
txLock sync.Mutex
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex

allCache map[common.Address][]*txpool.LazyTransaction
filteredCache map[common.Address][]*txpool.LazyTransaction
cacheLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
allCache: make(map[common.Address][]*txpool.LazyTransaction),
filteredCache: make(map[common.Address][]*txpool.LazyTransaction),
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
}
}

Expand Down Expand Up @@ -75,9 +75,8 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
}
}

func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) {
func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
pending := make(map[common.Address]types.Transactions)

pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
Expand All @@ -86,46 +85,10 @@ func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs ty
}
}
pc.txLock.Unlock()

// convert pending to lazyTransactions
filteredLazy := make(map[common.Address][]*txpool.LazyTransaction)
allLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
for _, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
filterd := filter(txs, addr)
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
allLazy[addr] = lazies
filteredLazy[addr] = lazies[:len(filterd)]
}
}

pc.cacheLock.Lock()
pc.filteredCache = filteredLazy
pc.allCache = allLazy
pc.cacheLock.Unlock()
}

func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction {
pc.cacheLock.Lock()
pending := pc.allCache
if filtered {
pending = pc.filteredCache
}
pc.cacheLock.Unlock()
return pending
}

Expand All @@ -136,7 +99,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.locals[addr] = true
}

func (pc *cacheForMiner) IsLocal(addr common.Address) bool {
func (pc *cacheForMiner) isLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
Expand Down
Loading

0 comments on commit c669a91

Please sign in to comment.