From 746438b005994dcc7d25c5fe986f9992f2ae8f7f Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Fri, 10 Jan 2025 12:17:41 +0800 Subject: [PATCH 1/5] pevm opt: fallback to sequencial processor when the TxDAG's depth of dependencies is too deep --- cmd/geth/main.go | 1 + cmd/utils/flags.go | 11 ++++++++ core/blockchain.go | 16 ++++++++++-- core/parallel_state_scheduler.go | 45 +++++++++++++++++++------------- core/vm/interpreter.go | 1 + eth/backend.go | 1 + eth/ethconfig/config.go | 16 ++++++------ 7 files changed, 63 insertions(+), 28 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index b4c79e26c..cc3ae5805 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -177,6 +177,7 @@ var ( utils.ParallelTxDAGFlag, utils.ParallelTxDAGFileFlag, utils.ParallelTxDAGSenderPrivFlag, + utils.ParallelTxDATMaxDepthRatioFlag, configFileFlag, utils.LogDebugFlag, utils.LogBacktraceAtFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 726f980ad..703fcaf2b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1136,6 +1136,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Category: flags.VMCategory, } + ParallelTxDATMaxDepthRatioFlag = &cli.Float64Flag{ + Name: "parallel.txdag-max-depth-ratio", + Usage: "A ratio to decide whether or not to execute transactions in parallel, it will fallback to sequencial processor if the depth is larger than this value (default = 0.9)", + Value: 0.9, + Category: flags.VMCategory, + } + VMOpcodeOptimizeFlag = &cli.BoolFlag{ Name: "vm.opcode.optimize", Usage: "enable opcode optimization", @@ -2057,6 +2064,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.ParallelTxDAGFile = ctx.String(ParallelTxDAGFileFlag.Name) } + if ctx.IsSet(ParallelTxDATMaxDepthRatioFlag.Name) { + cfg.ParallelTxDAGMaxDepthRatio = ctx.Float64(ParallelTxDATMaxDepthRatioFlag.Name) + } + if ctx.IsSet(ParallelTxDAGSenderPrivFlag.Name) { priHex := ctx.String(ParallelTxDAGSenderPrivFlag.Name) if cfg.Miner.ParallelTxDAGSenderPriv, err = crypto.HexToECDSA(priHex); err != nil { diff --git a/core/blockchain.go b/core/blockchain.go index 4695b270b..74c7ee5eb 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1971,6 +1971,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) ) blockProcessedInParallel := false + var ( + tooDeep bool + depth int + ) // skip block process if we already have the state, receipts and logs from mining work if !(receiptExist && logExist && stateExist) { // Retrieve the parent block and it's state to execute on top @@ -2016,10 +2020,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) statedb.SetExpectedStateRoot(block.Root()) + // findout whether or not the dependencies of the block are too deep to be processed + // if the dependencies are too deep, we will fallback to serial processing + txCount := len(block.Transactions()) + _, depth = BuildTxLevels(txCount, bc.vmConfig.TxDAG) + tooDeep = float64(depth)/float64(txCount) > bc.vmConfig.TxDAGMaxDepthRatio + // Process block using the parent state as reference point pstart = time.Now() txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge) - useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary + useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary || tooDeep if useSerialProcessor { receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig) blockProcessedInParallel = false @@ -2143,7 +2153,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) "accountUpdates", common.PrettyDuration(timers.AccountUpdates), "storageUpdates", common.PrettyDuration(timers.StorageUpdates), "accountHashes", common.PrettyDuration(timers.AccountHashes), - "storageHashes", common.PrettyDuration(timers.StorageHashes)) + "storageHashes", common.PrettyDuration(timers.StorageHashes), + "tooDeep", tooDeep, "depth", depth, + ) // Write the block to the chain and get the status. var ( diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index 25271bf0d..7467d58a3 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -350,8 +350,6 @@ func (tl TxLevel) predictTxDAG(dag types.TxDAG) { func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels { var levels TxLevels = make(TxLevels, 0, 8) - var currLevel int = 0 - var enlargeLevelsIfNeeded = func(currLevel int, levels *TxLevels) { if len(*levels) <= currLevel { for i := len(*levels); i <= currLevel; i++ { @@ -367,22 +365,37 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels { return TxLevels{all} } - marked := make(map[int]int, len(all)) - for _, tx := range all { - dep := dag.TxDep(tx.txIndex) + // build the levels from the DAG + marked, _ := BuildTxLevels(len(all), dag) + // put the transactions into the levels + for txIndex, tx := range all { + level := marked[txIndex] + enlargeLevelsIfNeeded(level, &levels) + levels[level] = append(levels[level], tx) + } + return levels +} + +func BuildTxLevels(txCount int, dag types.TxDAG) (marked map[int]int, depth int) { + if dag == nil { + return make(map[int]int), 0 + } + // marked is used to record which level that each transaction should be put + marked = make(map[int]int, txCount) + // currLevel is the level cursor to put the transactions in + depth = 0 + for txIndex := 0; txIndex < txCount; txIndex++ { + dep := dag.TxDep(txIndex) switch true { case dep != nil && dep.CheckFlag(types.ExcludedTxFlag), dep != nil && dep.CheckFlag(types.NonDependentRelFlag): // excluted tx, occupies the whole level // or dependent-to-all tx, occupies the whole level, too - levels = append(levels, TxLevel{tx}) - marked[tx.txIndex], currLevel = len(levels)-1, len(levels) + marked[txIndex], depth = depth, depth+1 case dep == nil || len(dep.TxIndexes) == 0: - // dependent on none - enlargeLevelsIfNeeded(currLevel, &levels) - levels[currLevel] = append(levels[currLevel], tx) - marked[tx.txIndex] = currLevel + // dependent on none, just put it in the current level + marked[txIndex] = depth case dep != nil && len(dep.TxIndexes) > 0: // dependent on others @@ -395,19 +408,15 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels { } if prevLevel < 0 { // broken DAG, just ignored it - enlargeLevelsIfNeeded(currLevel, &levels) - levels[currLevel] = append(levels[currLevel], tx) - marked[tx.txIndex] = currLevel + marked[txIndex] = depth continue } - enlargeLevelsIfNeeded(prevLevel+1, &levels) - levels[prevLevel+1] = append(levels[prevLevel+1], tx) // record the level of this tx - marked[tx.txIndex] = prevLevel + 1 + marked[txIndex] = prevLevel + 1 default: panic("unexpected case") } } - return levels + return marked, depth } diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index a0d2ebb82..7b4a2cc64 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -41,6 +41,7 @@ type Config struct { TxDAG types.TxDAG EnableParallelUnorderedMerge bool // Whether to enable unordered merge in parallel mode EnableTxParallelMerge bool // Whether to enable parallel merge in parallel mode + TxDAGMaxDepthRatio float64 } // ScopeContext contains the things that are per-call, such as stack and memory, diff --git a/eth/backend.go b/eth/backend.go index df7170818..0c83e36bf 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -242,6 +242,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EnableTxParallelMerge: config.ParallelTxParallelMerge, ParallelTxNum: config.ParallelTxNum, EnableOpcodeOptimizations: config.EnableOpcodeOptimizing, + TxDAGMaxDepthRatio: config.ParallelTxDAGMaxDepthRatio, } cacheConfig = &core.CacheConfig{ TrieCleanLimit: config.TrieCleanCache, diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 1d43484ab..27f56bf3c 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -219,14 +219,14 @@ type Config struct { RollupDisableTxPoolAdmission bool RollupHaltOnIncompatibleProtocolVersion string - ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync - ParallelTxNum int // Number of slot for transaction execution - EnableOpcodeOptimizing bool - EnableParallelTxDAG bool - ParallelTxDAGFile string - ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode - ParallelTxParallelMerge bool - + ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync + ParallelTxNum int // Number of slot for transaction execution + EnableOpcodeOptimizing bool + EnableParallelTxDAG bool + ParallelTxDAGFile string + ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode + ParallelTxParallelMerge bool + ParallelTxDAGMaxDepthRatio float64 } // CreateConsensusEngine creates a consensus engine for the given chain config. From c33db541dcb2cd4e27591da3602b59505f7d9bd1 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Fri, 10 Jan 2025 15:34:04 +0800 Subject: [PATCH 2/5] default to be disable --- cmd/utils/flags.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 703fcaf2b..cfca6c9f4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1138,8 +1138,8 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. ParallelTxDATMaxDepthRatioFlag = &cli.Float64Flag{ Name: "parallel.txdag-max-depth-ratio", - Usage: "A ratio to decide whether or not to execute transactions in parallel, it will fallback to sequencial processor if the depth is larger than this value (default = 0.9)", - Value: 0.9, + Usage: "A ratio to decide whether or not to execute transactions in parallel, it will fallback to sequencial processor if the depth is larger than this value (default = 1.0)", + Value: 1.0, Category: flags.VMCategory, } From 463a8343070a64b00b7f772c518973a925192b95 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Fri, 10 Jan 2025 16:06:36 +0800 Subject: [PATCH 3/5] set default value to 0.9 --- cmd/utils/flags.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index cfca6c9f4..703fcaf2b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1138,8 +1138,8 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. ParallelTxDATMaxDepthRatioFlag = &cli.Float64Flag{ Name: "parallel.txdag-max-depth-ratio", - Usage: "A ratio to decide whether or not to execute transactions in parallel, it will fallback to sequencial processor if the depth is larger than this value (default = 1.0)", - Value: 1.0, + Usage: "A ratio to decide whether or not to execute transactions in parallel, it will fallback to sequencial processor if the depth is larger than this value (default = 0.9)", + Value: 0.9, Category: flags.VMCategory, } From 8976cfd353f162fdd0f7e5e50dbf2f100b8c25d8 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 13 Jan 2025 10:06:57 +0800 Subject: [PATCH 4/5] fix the statedb when switching serialProcessor and parallelProcessor --- core/blockchain.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 74c7ee5eb..71ce55ab6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1760,6 +1760,19 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { return bc.insertChain(chain, true) } +func (bc *BlockChain) useSerialProcessor(block *types.Block) (bool, bool) { + // findout whether or not the dependencies of the block are too deep to be processed + // if the dependencies are too deep, we will fallback to serial processing + txCount := len(block.Transactions()) + _, depth := BuildTxLevels(txCount, bc.vmConfig.TxDAG) + tooDeep := float64(depth)/float64(txCount) > bc.vmConfig.TxDAGMaxDepthRatio + isByzantium := bc.chainConfig.IsByzantium(block.Number()) + + txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge) + useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary || tooDeep || !isByzantium + return useSerialProcessor, tooDeep +} + // insertChain is the internal implementation of InsertChain, which assumes that // 1) chains are contiguous, and 2) The chain mutex is held. // @@ -1972,8 +1985,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) blockProcessedInParallel := false var ( - tooDeep bool - depth int + tooDeep, useSerialProcessor bool + depth int ) // skip block process if we already have the state, receipts and logs from mining work if !(receiptExist && logExist && stateExist) { @@ -1986,9 +1999,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if bc.vmConfig.EnableParallelExec { bc.parseTxDAG(block) } - isByzantium := bc.chainConfig.IsByzantium(block.Number()) - if bc.vmConfig.EnableParallelExec && bc.vmConfig.TxDAG != nil && bc.vmConfig.EnableTxParallelMerge && isByzantium { + useSerialProcessor, tooDeep = bc.useSerialProcessor(block) + if !useSerialProcessor { statedb, err = state.NewParallel(parent.Root, bc.stateCache, bc.snaps) } else { statedb, err = state.New(parent.Root, bc.stateCache, bc.snaps) @@ -2020,16 +2033,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) statedb.SetExpectedStateRoot(block.Root()) - // findout whether or not the dependencies of the block are too deep to be processed - // if the dependencies are too deep, we will fallback to serial processing - txCount := len(block.Transactions()) - _, depth = BuildTxLevels(txCount, bc.vmConfig.TxDAG) - tooDeep = float64(depth)/float64(txCount) > bc.vmConfig.TxDAGMaxDepthRatio - // Process block using the parent state as reference point pstart = time.Now() - txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge) - useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary || tooDeep if useSerialProcessor { receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig) blockProcessedInParallel = false From 0f647928d53e4c60f9a7d812464b782fd79178d3 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 13 Jan 2025 15:19:50 +0800 Subject: [PATCH 5/5] fix: wrong depth when building levels from a dag which contains 'execlude' transactions --- core/parallel_state_scheduler.go | 29 +++++++++++++---- core/parallel_state_scheduler_test.go | 47 +++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 8 deletions(-) diff --git a/core/parallel_state_scheduler.go b/core/parallel_state_scheduler.go index 7467d58a3..d9a580df6 100644 --- a/core/parallel_state_scheduler.go +++ b/core/parallel_state_scheduler.go @@ -382,8 +382,12 @@ func BuildTxLevels(txCount int, dag types.TxDAG) (marked map[int]int, depth int) } // marked is used to record which level that each transaction should be put marked = make(map[int]int, txCount) - // currLevel is the level cursor to put the transactions in - depth = 0 + var ( + // currLevelHasTx marks if the current level has any transaction + currLevelHasTx bool + ) + + depth, currLevelHasTx = 0, false for txIndex := 0; txIndex < txCount; txIndex++ { dep := dag.TxDep(txIndex) switch true { @@ -391,11 +395,17 @@ func BuildTxLevels(txCount int, dag types.TxDAG) (marked map[int]int, depth int) dep != nil && dep.CheckFlag(types.NonDependentRelFlag): // excluted tx, occupies the whole level // or dependent-to-all tx, occupies the whole level, too - marked[txIndex], depth = depth, depth+1 + if currLevelHasTx { + // shift to next level if there are transactions in the current level + depth++ + } + marked[txIndex] = depth + // occupy the current level + depth, currLevelHasTx = depth+1, false case dep == nil || len(dep.TxIndexes) == 0: // dependent on none, just put it in the current level - marked[txIndex] = depth + marked[txIndex], currLevelHasTx = depth, true case dep != nil && len(dep.TxIndexes) > 0: // dependent on others @@ -408,15 +418,22 @@ func BuildTxLevels(txCount int, dag types.TxDAG) (marked map[int]int, depth int) } if prevLevel < 0 { // broken DAG, just ignored it - marked[txIndex] = depth + marked[txIndex], currLevelHasTx = depth, true continue } // record the level of this tx marked[txIndex] = prevLevel + 1 + if marked[txIndex] > depth { + depth, currLevelHasTx = marked[txIndex], true + } default: panic("unexpected case") } } - return marked, depth + // check if the last level has any transaction, to avoid the empty level + if !currLevelHasTx { + depth-- + } + return marked, depth + 1 } diff --git a/core/parallel_state_scheduler_test.go b/core/parallel_state_scheduler_test.go index fd78f37f7..9f0c553f5 100644 --- a/core/parallel_state_scheduler_test.go +++ b/core/parallel_state_scheduler_test.go @@ -601,12 +601,44 @@ func TestNewTxLevels(t *testing.T) { assertEqual(levels([]uint64{1, 2, 3, 4, 5}, [][]int{nil, nil, nil, {-2}, {-2}}), [][]uint64{{1, 2, 3}, {4}, {5}}, t) // case 9: loop-back txdag - assertEqual(levels([]uint64{1, 2, 3, 4}, [][]int{{1}, nil, {0}, nil}), [][]uint64{{1, 2, 4}, {3}}, t) + assertEqual(levels([]uint64{1, 2, 3, 4}, [][]int{{1}, nil, {0}, nil}), [][]uint64{{1, 2}, {3, 4}}, t) + + // case 10: nonedependent txs + execlude txs + nonedependent txs + assertEqual(levels([]uint64{1, 2, 3, 4, 5}, [][]int{nil, nil, {-2}, nil, nil}), [][]uint64{{1, 2}, {3}, {4, 5}}, t) +} + +func TestBuildLevels(t *testing.T) { + var ( + marks map[int]int + depth int + ) + // case 1: 1 excluded tx + n no dependencies txs + n dependencies txs + 1 all-dependencies tx + marks, depth = levelMarks([]uint64{1, 2, 3, 4, 5, 6, 7}, [][]int{{-1}, nil, nil, nil, {0, 1}, {2}, {-2}}) + assertEqualMarks(marks, map[int]int{0: 0, 1: 1, 2: 1, 3: 1, 4: 2, 5: 2, 6: 3}, t) + if depth != 4 { + t.Fatalf("expected depth: 4, got depth: %d", depth) + } + // case 2: nonedependent txs + execlude txs + nonedependent txs + marks, depth = levelMarks([]uint64{1, 2, 3, 4, 5}, [][]int{nil, nil, {-2}, nil, nil}) + assertEqualMarks(marks, map[int]int{0: 0, 1: 0, 2: 1, 3: 2, 4: 2}, t) + if depth != 3 { + t.Fatalf("expected depth: 3, got depth: %d", depth) + } + // case 3: (broken TxDAG) n dependent txs + 1 execlude tx + none dependent txs + marks, depth = levelMarks([]uint64{1, 2, 3, 4, 5}, [][]int{{1}, {2}, {-1}, nil, nil}) + assertEqualMarks(marks, map[int]int{0: 0, 1: 0, 2: 1, 3: 2, 4: 2}, t) + if depth != 3 { + t.Fatalf("expected depth: 3, got depth: %d", depth) + } } func TestMultiLevel(t *testing.T) { // case 7: 1 excluded tx + n no dependencies txs + n dependencies txs + 1 all-dependencies tx - assertEqual(levels([]uint64{1, 2, 3, 4, 5, 6, 7, 8}, [][]int{nil, nil, nil, {0}, nil, {1}, nil, {2}}), [][]uint64{{1, 2, 3, 5, 7}, {4, 6, 8}}, t) + assertEqual(levels([]uint64{1, 2, 3, 4, 5, 6, 7, 8}, [][]int{nil, nil, nil, {0}, nil, {1}, nil, {2}}), [][]uint64{{1, 2, 3}, {4, 5, 6, 7, 8}}, t) +} + +func levelMarks(nonces []uint64, txdag [][]int) (map[int]int, int) { + return BuildTxLevels(len(nonces), int2txdag(txdag)) } func levels(nonces []uint64, txdag [][]int) TxLevels { @@ -650,6 +682,17 @@ func int2txdag(txdag [][]int) types.TxDAG { return &dag } +func assertEqualMarks(actual map[int]int, expected map[int]int, t *testing.T) { + if len(actual) != len(expected) { + t.Fatalf("expected %d marks, got %d marks", len(expected), len(actual)) + } + for i, mark := range actual { + if expected[i] != mark { + t.Fatalf("expected mark[%d]: %d, got mark[%d]: %d", i, expected[i], i, mark) + } + } +} + func assertEqual(actual TxLevels, expected [][]uint64, t *testing.T) { if len(actual) != len(expected) { t.Fatalf("expected %d levels, got %d levels", len(expected), len(actual))