Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pevm: fallback to sequencial processor when the TxDAG is too deep #251

Open
wants to merge 3 commits into
base: feature/TxDAG-PEVM
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ var (
utils.ParallelTxDAGFlag,
utils.ParallelTxDAGFileFlag,
utils.ParallelTxDAGSenderPrivFlag,
utils.ParallelTxDATMaxDepthRatioFlag,
configFileFlag,
utils.LogDebugFlag,
utils.LogBacktraceAtFlag,
Expand Down
11 changes: 11 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Category: flags.VMCategory,
}

ParallelTxDATMaxDepthRatioFlag = &cli.Float64Flag{
Name: "parallel.txdag-max-depth-ratio",
Usage: "A ratio to decide whether or not to execute transactions in parallel, it will fallback to sequencial processor if the depth is larger than this value (default = 0.9)",
Value: 0.9,
Category: flags.VMCategory,
}

VMOpcodeOptimizeFlag = &cli.BoolFlag{
Name: "vm.opcode.optimize",
Usage: "enable opcode optimization",
Expand Down Expand Up @@ -2057,6 +2064,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.ParallelTxDAGFile = ctx.String(ParallelTxDAGFileFlag.Name)
}

if ctx.IsSet(ParallelTxDATMaxDepthRatioFlag.Name) {
cfg.ParallelTxDAGMaxDepthRatio = ctx.Float64(ParallelTxDATMaxDepthRatioFlag.Name)
}

if ctx.IsSet(ParallelTxDAGSenderPrivFlag.Name) {
priHex := ctx.String(ParallelTxDAGSenderPrivFlag.Name)
if cfg.Miner.ParallelTxDAGSenderPriv, err = crypto.HexToECDSA(priHex); err != nil {
Expand Down
16 changes: 14 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
)

blockProcessedInParallel := false
var (
tooDeep bool
depth int
)
// skip block process if we already have the state, receipts and logs from mining work
if !(receiptExist && logExist && stateExist) {
// Retrieve the parent block and it's state to execute on top
Expand Down Expand Up @@ -2016,10 +2020,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

statedb.SetExpectedStateRoot(block.Root())

// findout whether or not the dependencies of the block are too deep to be processed
// if the dependencies are too deep, we will fallback to serial processing
txCount := len(block.Transactions())
_, depth = BuildTxLevels(txCount, bc.vmConfig.TxDAG)
tooDeep = float64(depth)/float64(txCount) > bc.vmConfig.TxDAGMaxDepthRatio

// Process block using the parent state as reference point
pstart = time.Now()
txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge)
useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary
useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary || tooDeep
if useSerialProcessor {
receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig)
blockProcessedInParallel = false
Expand Down Expand Up @@ -2143,7 +2153,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
"accountUpdates", common.PrettyDuration(timers.AccountUpdates),
"storageUpdates", common.PrettyDuration(timers.StorageUpdates),
"accountHashes", common.PrettyDuration(timers.AccountHashes),
"storageHashes", common.PrettyDuration(timers.StorageHashes))
"storageHashes", common.PrettyDuration(timers.StorageHashes),
"tooDeep", tooDeep, "depth", depth,
)

// Write the block to the chain and get the status.
var (
Expand Down
45 changes: 27 additions & 18 deletions core/parallel_state_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,6 @@ func (tl TxLevel) predictTxDAG(dag types.TxDAG) {

func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
var levels TxLevels = make(TxLevels, 0, 8)
var currLevel int = 0

var enlargeLevelsIfNeeded = func(currLevel int, levels *TxLevels) {
if len(*levels) <= currLevel {
for i := len(*levels); i <= currLevel; i++ {
Expand All @@ -367,22 +365,37 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
return TxLevels{all}
}

marked := make(map[int]int, len(all))
for _, tx := range all {
dep := dag.TxDep(tx.txIndex)
// build the levels from the DAG
marked, _ := BuildTxLevels(len(all), dag)
// put the transactions into the levels
for txIndex, tx := range all {
level := marked[txIndex]
enlargeLevelsIfNeeded(level, &levels)
levels[level] = append(levels[level], tx)
}
return levels
}

func BuildTxLevels(txCount int, dag types.TxDAG) (marked map[int]int, depth int) {
if dag == nil {
return make(map[int]int), 0
}
// marked is used to record which level that each transaction should be put
marked = make(map[int]int, txCount)
// currLevel is the level cursor to put the transactions in
depth = 0
for txIndex := 0; txIndex < txCount; txIndex++ {
dep := dag.TxDep(txIndex)
switch true {
case dep != nil && dep.CheckFlag(types.ExcludedTxFlag),
dep != nil && dep.CheckFlag(types.NonDependentRelFlag):
// excluted tx, occupies the whole level
// or dependent-to-all tx, occupies the whole level, too
levels = append(levels, TxLevel{tx})
marked[tx.txIndex], currLevel = len(levels)-1, len(levels)
marked[txIndex], depth = depth, depth+1

case dep == nil || len(dep.TxIndexes) == 0:
// dependent on none
enlargeLevelsIfNeeded(currLevel, &levels)
levels[currLevel] = append(levels[currLevel], tx)
marked[tx.txIndex] = currLevel
// dependent on none, just put it in the current level
marked[txIndex] = depth

case dep != nil && len(dep.TxIndexes) > 0:
// dependent on others
Expand All @@ -395,19 +408,15 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
}
if prevLevel < 0 {
// broken DAG, just ignored it
enlargeLevelsIfNeeded(currLevel, &levels)
levels[currLevel] = append(levels[currLevel], tx)
marked[tx.txIndex] = currLevel
marked[txIndex] = depth
continue
}
enlargeLevelsIfNeeded(prevLevel+1, &levels)
levels[prevLevel+1] = append(levels[prevLevel+1], tx)
// record the level of this tx
marked[tx.txIndex] = prevLevel + 1
marked[txIndex] = prevLevel + 1

default:
panic("unexpected case")
}
}
return levels
return marked, depth
}
1 change: 1 addition & 0 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Config struct {
TxDAG types.TxDAG
EnableParallelUnorderedMerge bool // Whether to enable unordered merge in parallel mode
EnableTxParallelMerge bool // Whether to enable parallel merge in parallel mode
TxDAGMaxDepthRatio float64
}

// ScopeContext contains the things that are per-call, such as stack and memory,
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnableTxParallelMerge: config.ParallelTxParallelMerge,
ParallelTxNum: config.ParallelTxNum,
EnableOpcodeOptimizations: config.EnableOpcodeOptimizing,
TxDAGMaxDepthRatio: config.ParallelTxDAGMaxDepthRatio,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
Expand Down
16 changes: 8 additions & 8 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ type Config struct {
RollupDisableTxPoolAdmission bool
RollupHaltOnIncompatibleProtocolVersion string

ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
ParallelTxParallelMerge bool

ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
ParallelTxParallelMerge bool
ParallelTxDAGMaxDepthRatio float64
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down
Loading