diff --git a/eth/stagedsync/stage_execute_block_eecutor.go b/eth/stagedsync/stage_execute_block_eecutor.go new file mode 100644 index 00000000000..6e1cf34c9f7 --- /dev/null +++ b/eth/stagedsync/stage_execute_block_eecutor.go @@ -0,0 +1,366 @@ +package stagedsync + +import ( + "context" + "errors" + "fmt" + + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/dbutils" + "github.com/ledgerwatch/erigon/consensus/misc" + "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/state" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/erigon/eth/calltracer" + "github.com/ledgerwatch/erigon/eth/tracers/logger" + "github.com/ledgerwatch/erigon/zk/hermez_db" + rawdbZk "github.com/ledgerwatch/erigon/zk/rawdb" + "github.com/ledgerwatch/log/v3" +) + +var ( + ErrExecutionError = fmt.Errorf("execution error") +) + +type hermezDb interface { + WriteBlockInfoRoot(blockNum uint64, root common.Hash) error + SetNewTx(tx kv.RwTx) + state.ReadOnlyHermezDb +} + +type blockExecutor struct { + ctx context.Context + logPrefix string + cfg ExecuteBlockCfg + tx kv.RwTx + batch kv.StatelessRwTx + initialCycle bool + nextStagesExpectData bool + + // set internelly + hermezDb hermezDb + stateStream bool + getHeader func(hash common.Hash, number uint64) *types.Header + getTracer func(txIndex int, txHash common.Hash) (vm.EVMLogger, error) + historyPruneTo uint64 + receiptsPruneTo uint64 + callTracesPruneTo uint64 + from uint64 + + // these change on each block + prevBlockRoot common.Hash + prevBlockHash common.Hash + datastreamBlockHash common.Hash + block *types.Block + senders []common.Address + currentStateGas uint64 +} + +func NewBlockExecutor( + ctx context.Context, + logPrefix string, + cfg ExecuteBlockCfg, + tx kv.RwTx, + batch kv.StatelessRwTx, + initialCycle bool, + nextStagesExpectData bool, +) *blockExecutor { + return &blockExecutor{ + ctx: ctx, + logPrefix: logPrefix, + cfg: cfg, + tx: tx, + batch: batch, + initialCycle: initialCycle, + nextStagesExpectData: nextStagesExpectData, + hermezDb: hermez_db.NewHermezDb(tx), + } +} + +func (be *blockExecutor) Init(from, to uint64) (err error) { + be.from = from + be.stateStream = !be.initialCycle && be.cfg.stateStream && to-from < stateStreamLimit + + be.prevBlockRoot, be.prevBlockHash, err = be.getBlockHashValues(from) + if err != nil { + return fmt.Errorf("getBlockHashValues: %w", err) + } + + // where the magic happens + be.getHeader = func(hash common.Hash, number uint64) *types.Header { + h, _ := be.cfg.blockReader.Header(context.Background(), be.tx, hash, number) + return h + } + + be.getTracer = func(txIndex int, txHash common.Hash) (vm.EVMLogger, error) { + // return logger.NewJSONFileLogger(&logger.LogConfig{}, txHash.String()), nil + return logger.NewStructLogger(&logger.LogConfig{}), nil + } + + be.historyPruneTo = be.cfg.prune.History.PruneTo(to) + be.receiptsPruneTo = be.cfg.prune.Receipts.PruneTo(to) + be.callTracesPruneTo = be.cfg.prune.CallTraces.PruneTo(to) + + return nil +} + +func (be *blockExecutor) SetNewTx(tx kv.RwTx, batch kv.StatelessRwTx) { + be.tx = tx + be.batch = batch + + be.hermezDb = hermez_db.NewHermezDb(tx) +} + +func (be *blockExecutor) GetProgress() uint64 { + if be.block != nil { + return be.block.NumberU64() + } + + return be.from +} + +func (be *blockExecutor) ExecuteBlock(blockNum uint64) error { + //fetch values pre execute + if err := be.getPreexecuteValues(blockNum); err != nil { + return fmt.Errorf("getPreexecuteValues: %w", err) + } + + execRs, err := be.executeBlock() + if err != nil { + if !errors.Is(err, context.Canceled) { + log.Warn(fmt.Sprintf("[%s] Execution failed", be.logPrefix), "block", blockNum, "hash", be.datastreamBlockHash.Hex(), "err", err) + if be.cfg.hd != nil { + be.cfg.hd.ReportBadHeaderPoS(be.datastreamBlockHash, be.block.ParentHash()) + } + if be.cfg.badBlockHalt { + return fmt.Errorf("executeBlockZk: %w", err) + } + } + return fmt.Errorf("%w: %w", ErrExecutionError, err) + } + + if execRs.BlockInfoTree != nil { + if err = be.hermezDb.WriteBlockInfoRoot(blockNum, *execRs.BlockInfoTree); err != nil { + return fmt.Errorf("WriteBlockInfoRoot: %w", err) + } + } + + // exec loop variables + header := be.block.HeaderNoCopy() + header.GasUsed = uint64(execRs.GasUsed) + header.ReceiptHash = types.DeriveSha(execRs.Receipts) + header.Bloom = execRs.Bloom + // do not move bove the header setting - hash will differ + be.prevBlockRoot = be.block.Root() + be.prevBlockHash = header.Hash() + be.currentStateGas = be.currentStateGas + header.GasUsed + + //commit values post execute + if err := be.postExecuteCommitValues(); err != nil { + return fmt.Errorf("postExecuteCommitValues: %w", err) + } + + return nil +} + +func (be *blockExecutor) getBlockHashValues(number uint64) (common.Hash, common.Hash, error) { + prevheaderHash, err := rawdb.ReadCanonicalHash(be.tx, number) + if err != nil { + return common.Hash{}, common.Hash{}, err + } + header, err := be.cfg.blockReader.Header(be.ctx, be.tx, prevheaderHash, number) + if err != nil { + return common.Hash{}, common.Hash{}, err + } + + return header.Root, prevheaderHash, nil +} + +func (be *blockExecutor) executeBlock() (execRs *core.EphemeralExecResultZk, err error) { + blockNum := be.block.NumberU64() + + // Incremental move of next stages depend on fully written ChangeSets, Receipts, CallTraceSet + writeChangeSets := be.nextStagesExpectData || blockNum > be.historyPruneTo + writeReceipts := be.nextStagesExpectData || blockNum > be.receiptsPruneTo + writeCallTraces := be.nextStagesExpectData || blockNum > be.callTracesPruneTo + + stateReader, stateWriter, err := newStateReaderWriter( + be.batch, + be.tx, + be.block, + writeChangeSets, + be.cfg.accumulator, + be.cfg.blockReader, + be.stateStream, + ) + if err != nil { + return nil, fmt.Errorf("newStateReaderWriter: %w", err) + } + + callTracer := calltracer.NewCallTracer() + be.cfg.vmConfig.Debug = true + be.cfg.vmConfig.Tracer = callTracer + + getHashFn := core.GetHashFn(be.block.Header(), be.getHeader) + if execRs, err = core.ExecuteBlockEphemerallyZk( + be.cfg.chainConfig, + be.cfg.vmConfig, + getHashFn, + be.cfg.engine, + be.block, + stateReader, + stateWriter, + ChainReaderImpl{ + config: be.cfg.chainConfig, + tx: be.tx, + blockReader: be.cfg.blockReader, + }, + be.getTracer, + be.hermezDb, + &be.prevBlockRoot, + ); err != nil { + return nil, fmt.Errorf("ExecuteBlockEphemerallyZk: %w", err) + } + + if writeReceipts { + if err := rawdb.AppendReceipts(be.tx, blockNum, execRs.Receipts); err != nil { + return nil, fmt.Errorf("AppendReceipts: %w", err) + } + + stateSyncReceipt := execRs.StateSyncReceipt + if stateSyncReceipt != nil && stateSyncReceipt.Status == types.ReceiptStatusSuccessful { + if err := rawdb.WriteBorReceipt(be.tx, blockNum, stateSyncReceipt); err != nil { + return nil, fmt.Errorf("WriteBorReceipt: %w", err) + } + } + } + + if be.cfg.changeSetHook != nil { + if hasChangeSet, ok := stateWriter.(HasChangeSetWriter); ok { + be.cfg.changeSetHook(blockNum, hasChangeSet.ChangeSetWriter()) + } + } + if writeCallTraces { + if err := callTracer.WriteToDb(be.tx, be.block, *be.cfg.vmConfig); err != nil { + return nil, fmt.Errorf("WriteToDb: %w", err) + } + } + + return execRs, nil +} + +// gets the pre-execute values for a block and sets the previous block hash +func (be *blockExecutor) getPreexecuteValues(blockNum uint64) error { + preExecuteHeaderHash, err := rawdb.ReadCanonicalHash(be.tx, blockNum) + if err != nil { + return fmt.Errorf("ReadCanonicalHash: %w", err) + } + + block, senders, err := be.cfg.blockReader.BlockWithSenders(be.ctx, be.tx, preExecuteHeaderHash, blockNum) + if err != nil { + return fmt.Errorf("BlockWithSenders: %w", err) + } + + if block == nil { + return fmt.Errorf("empty block blocknum: %d", blockNum) + } + + block.HeaderNoCopy().ParentHash = be.prevBlockHash + + if be.cfg.chainConfig.IsLondon(blockNum) { + parentHeader, err := be.cfg.blockReader.Header(be.ctx, be.tx, be.prevBlockHash, blockNum-1) + if err != nil { + return fmt.Errorf("cfg.blockReader.Header: %w", err) + } + block.HeaderNoCopy().BaseFee = misc.CalcBaseFeeZk(be.cfg.chainConfig, parentHeader) + } + + be.datastreamBlockHash = preExecuteHeaderHash + be.block = block + be.senders = senders + + return nil +} + +func (be *blockExecutor) postExecuteCommitValues() error { + header := be.block.Header() + blockHash := header.Hash() + blockNum := be.block.NumberU64() + + // if datastream hash was wrong, remove old data + if blockHash != be.datastreamBlockHash { + if be.cfg.chainConfig.IsForkId9Elderberry2(blockNum) { + log.Warn( + fmt.Sprintf("[%s] Blockhash mismatch", be.logPrefix), + "blockNumber", blockNum, + "datastreamBlockHash", be.datastreamBlockHash, + "calculatedBlockHash", blockHash, + ) + } + if err := rawdbZk.DeleteSenders(be.tx, be.datastreamBlockHash, blockNum); err != nil { + return fmt.Errorf("DeleteSenders: %w", err) + } + if err := rawdbZk.DeleteHeader(be.tx, be.datastreamBlockHash, blockNum); err != nil { + return fmt.Errorf("DeleteHeader: %w", err) + } + + bodyForStorage, err := rawdb.ReadBodyForStorageByKey(be.tx, dbutils.BlockBodyKey(blockNum, be.datastreamBlockHash)) + if err != nil { + return fmt.Errorf("ReadBodyForStorageByKey: %w", err) + } + + if err := rawdb.DeleteBodyAndTransactions(be.tx, blockNum, be.datastreamBlockHash); err != nil { + return fmt.Errorf("DeleteBodyAndTransactions: %w", err) + } + if err := rawdb.WriteBodyAndTransactions(be.tx, blockHash, blockNum, be.block.Transactions(), bodyForStorage); err != nil { + return fmt.Errorf("WriteBodyAndTransactions: %w", err) + } + + // [zkevm] senders were saved in stage_senders for headerHashes based on incomplete headers + // in stage execute we complete the headers and senders should be moved to the correct headerHash + // also we should delete other data based on the old hash, since it is unaccessable now + if err := rawdb.WriteSenders(be.tx, blockHash, blockNum, be.senders); err != nil { + return fmt.Errorf("failed to write senders: %w", err) + } + } + + // TODO: how can we store this data right first time? Or mop up old data as we're currently duping storage + /* + , \ / , + / \ )\__/( / \ + / \ (_\ /_) / \ + ____/_____\__\@ @/___/_____\____ + | |\../| | + | \VV/ | + | ZKEVM duping storage | + |_________________________________| + | /\ / \\ \ /\ | + | / V )) V \ | + |/ ` // ' \| + ` V ' + + we need to write the header back to the db at this point as the gas + used wasn't available from the data stream, or receipt hash, or bloom, so we're relying on execution to + provide it. We also need to update the canonical hash, so we can retrieve this newly updated header + later. + */ + if err := rawdb.WriteHeader_zkEvm(be.tx, header); err != nil { + return fmt.Errorf("WriteHeader_zkEvm: %w", err) + } + if err := rawdb.WriteHeadHeaderHash(be.tx, blockHash); err != nil { + return fmt.Errorf("WriteHeadHeaderHash: %w", err) + } + if err := rawdb.WriteCanonicalHash(be.tx, blockHash, blockNum); err != nil { + return fmt.Errorf("WriteCanonicalHash: %w", err) + } + + // write the new block lookup entries + if err := rawdb.WriteTxLookupEntries_zkEvm(be.tx, be.block); err != nil { + return fmt.Errorf("WriteTxLookupEntries_zkEvm: %w", err) + } + + return nil +} diff --git a/eth/stagedsync/stage_execute_zkevm.go b/eth/stagedsync/stage_execute_zkevm.go index 6994881c4ca..563471c6bdf 100644 --- a/eth/stagedsync/stage_execute_zkevm.go +++ b/eth/stagedsync/stage_execute_zkevm.go @@ -13,12 +13,9 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/wrap" - "github.com/ledgerwatch/erigon-lib/kv/dbutils" "github.com/ledgerwatch/erigon-lib/kv/membatch" "github.com/ledgerwatch/log/v3" - "github.com/ledgerwatch/erigon/consensus/misc" - "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/zk/erigon_db" "github.com/ledgerwatch/erigon/zk/hermez_db" @@ -26,17 +23,20 @@ import ( "github.com/ledgerwatch/erigon/common/math" "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/core/state" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/core/vm" - "github.com/ledgerwatch/erigon/eth/calltracer" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/eth/tracers/logger" rawdbZk "github.com/ledgerwatch/erigon/zk/rawdb" "github.com/ledgerwatch/erigon/zk/utils" ) -func SpawnExecuteBlocksStageZk(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) { +func SpawnExecuteBlocksStageZk( + s *StageState, + u Unwinder, + tx kv.RwTx, + toBlock uint64, + ctx context.Context, + cfg ExecuteBlockCfg, + initialCycle bool, +) (err error) { if cfg.historyV3 { if err = ExecBlockV3(s, u, wrap.TxContainer{Tx: tx}, toBlock, ctx, cfg, initialCycle, log.New()); err != nil { return fmt.Errorf("ExecBlockV3: %w", err) @@ -65,37 +65,18 @@ func SpawnExecuteBlocksStageZk(s *StageState, u Unwinder, tx kv.RwTx, toBlock ui defer tx.Rollback() } - nextStageProgress, err := stages.GetStageProgress(tx, stages.HashState) - if err != nil { - return fmt.Errorf("getStageProgress: %w", err) - } - nextStagesExpectData := nextStageProgress > 0 // Incremental move of next stages depend on fully written ChangeSets, Receipts, CallTraceSet - - var currentStateGas uint64 // used for batch commits of state - // Transform batch_size limit into Ggas - gasState := uint64(cfg.batchSize) * uint64(datasize.KB) * 2 - - hermezDb := hermez_db.NewHermezDb(tx) - - var batch kv.PendingMutations // state is stored through ethdb batches - batch = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, log.New()) + var batch kv.PendingMutations = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, log.New()) // avoids stacking defers within the loop defer func() { batch.Close() }() + hermezDb := hermez_db.NewHermezDb(tx) if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, hermezDb, s.LogPrefix(), cfg.zk.LogLevel == log.LvlTrace); err != nil { return fmt.Errorf("UpdateZkEVMBlockCfg: %w", err) } - eridb := erigon_db.NewErigonDb(tx) - - prevBlockRoot, prevBlockHash, err := getBlockHashValues(cfg, ctx, tx, s.BlockNumber) - if err != nil { - return fmt.Errorf("getBlockHashValues: %w", err) - } - to, total, err := getExecRange(cfg, tx, s.BlockNumber, toBlock, s.LogPrefix()) if err != nil { return fmt.Errorf("getExecRange: %w", err) @@ -103,15 +84,39 @@ func SpawnExecuteBlocksStageZk(s *StageState, u Unwinder, tx kv.RwTx, toBlock ui log.Info(fmt.Sprintf("[%s] Blocks execution", s.LogPrefix()), "from", s.BlockNumber, "to", to) - stateStream := !initialCycle && cfg.stateStream && to-s.BlockNumber < stateStreamLimit - - logger := utils.NewTxGasLogger(logInterval, s.BlockNumber, total, gasState, s.LogPrefix(), &batch, tx, stages.SyncMetrics[stages.Execution]) + // Transform batch_size limit into Ggas + gasState := uint64(cfg.batchSize) * uint64(datasize.KB) * 2 + logger := utils.NewTxGasLogger( + logInterval, + s.BlockNumber, + total, + gasState, + s.LogPrefix(), + &batch, + tx, + stages.SyncMetrics[stages.Execution], + ) logger.Start() defer logger.Stop() - stageProgress := s.BlockNumber + nextStageProgress, err := stages.GetStageProgress(tx, stages.HashState) + if err != nil { + return fmt.Errorf("getStageProgress: %w", err) + } + + blockExecutor := NewBlockExecutor( + ctx, + s.LogPrefix(), + cfg, + tx, + batch, + initialCycle, + nextStageProgress > 0, // Incremental move of next stages depend on fully written ChangeSets, Receipts, CallTraceSet + ) + blockExecutor.Init(s.BlockNumber, to) + var stoppedErr error -Loop: + for blockNum := s.BlockNumber + 1; blockNum <= to; blockNum++ { if cfg.zk.SyncLimit > 0 && blockNum > cfg.zk.SyncLimit { log.Info(fmt.Sprintf("[%s] Sync limit reached", s.LogPrefix()), "block", blockNum) @@ -122,57 +127,27 @@ Loop: break } - //fetch values pre execute - datastreamBlockHash, block, senders, err := getPreexecuteValues(cfg, ctx, tx, blockNum, prevBlockHash) - if err != nil { - stoppedErr = fmt.Errorf("getPreexecuteValues: %w", err) - break - } - - // Incremental move of next stages depend on fully written ChangeSets, Receipts, CallTraceSet - writeChangeSets := nextStagesExpectData || blockNum > cfg.prune.History.PruneTo(to) - writeReceipts := nextStagesExpectData || blockNum > cfg.prune.Receipts.PruneTo(to) - writeCallTraces := nextStagesExpectData || blockNum > cfg.prune.CallTraces.PruneTo(to) - - execRs, err := executeBlockZk(block, &prevBlockRoot, tx, batch, cfg, *cfg.vmConfig, writeChangeSets, writeReceipts, writeCallTraces, initialCycle, stateStream, hermezDb) - if err != nil { - if !errors.Is(err, context.Canceled) { - log.Warn(fmt.Sprintf("[%s] Execution failed", s.LogPrefix()), "block", blockNum, "hash", datastreamBlockHash.Hex(), "err", err) - if cfg.hd != nil { - cfg.hd.ReportBadHeaderPoS(datastreamBlockHash, block.ParentHash()) - } - if cfg.badBlockHalt { - return fmt.Errorf("executeBlockZk: %w", err) - } + if err := blockExecutor.ExecuteBlock(blockNum); err != nil { + if !errors.Is(err, ErrExecutionError) { + return fmt.Errorf("executeBlock: %w", err) } - u.UnwindTo(blockNum-1, UnwindReason{Block: &datastreamBlockHash}) - break Loop - } - if execRs.BlockInfoTree != nil { - if err = hermezDb.WriteBlockInfoRoot(blockNum, *execRs.BlockInfoTree); err != nil { - return fmt.Errorf("WriteBlockInfoRoot: %w", err) - } + u.UnwindTo(blockNum-1, UnwindReason{Block: &blockExecutor.datastreamBlockHash}) + break } - // exec loop variables - header := block.HeaderNoCopy() - header.GasUsed = uint64(execRs.GasUsed) - header.ReceiptHash = types.DeriveSha(execRs.Receipts) - header.Bloom = execRs.Bloom - // don't move above header values setting - wrong hash will be calculated - prevBlockHash = header.Hash() - prevBlockRoot = header.Root - stageProgress = blockNum - currentStateGas = currentStateGas + header.GasUsed - - logger.AddBlock(uint64(block.Transactions().Len()), stageProgress, currentStateGas, blockNum) + logger.AddBlock( + uint64(blockExecutor.block.Transactions().Len()), + blockExecutor.GetProgress(), + blockExecutor.currentStateGas, + blockNum, + ) // should update progress if batch.BatchSize() >= int(cfg.batchSize) { - log.Info("Committed State", "gas reached", currentStateGas, "gasTarget", gasState) - currentStateGas = 0 - if err = s.Update(batch, stageProgress); err != nil { + log.Info("Committed State", "gas reached", blockExecutor.currentStateGas, "gasTarget", gasState) + blockExecutor.currentStateGas = 0 + if err = s.Update(batch, blockExecutor.GetProgress()); err != nil { return fmt.Errorf("s.Update: %w", err) } if err = batch.Flush(ctx, tx); err != nil { @@ -187,26 +162,21 @@ Loop: return fmt.Errorf("cfg.db.BeginRw: %w", err) } defer tx.Rollback() - eridb = erigon_db.NewErigonDb(tx) + logger.SetTx(tx) + batch = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, log.New()) + blockExecutor.SetNewTx(tx, batch) } - batch = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, log.New()) - hermezDb = hermez_db.NewHermezDb(tx) - } - - //commit values post execute - if err := postExecuteCommitValues(s.LogPrefix(), cfg, tx, eridb, batch, datastreamBlockHash, block, senders); err != nil { - return fmt.Errorf("postExecuteCommitValues: %w", err) } } - if err = s.Update(batch, stageProgress); err != nil { + if err = s.Update(batch, blockExecutor.GetProgress()); err != nil { return fmt.Errorf("s.Update: %w", err) } // we need to artificially update the headers stage here as well to ensure that notifications // can fire at the end of the stage loop and inform RPC subscriptions of new blocks for example - if err = stages.SaveStageProgress(tx, stages.Headers, stageProgress); err != nil { + if err = stages.SaveStageProgress(tx, stages.Headers, blockExecutor.GetProgress()); err != nil { return fmt.Errorf("SaveStageProgress: %w", err) } @@ -215,37 +185,23 @@ Loop: } // stageProgress is latest processsed block number - if _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(tx, stageProgress); err != nil { + if _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(tx, blockExecutor.GetProgress()); err != nil { return fmt.Errorf("IncrementStateVersionByBlockNumberIfNeeded: %w", err) } if !useExternalTx { - log.Info(fmt.Sprintf("[%s] Commiting DB transaction...", s.LogPrefix()), "block", stageProgress) + log.Info(fmt.Sprintf("[%s] Commiting DB transaction...", s.LogPrefix()), "block", blockExecutor.GetProgress()) if err = tx.Commit(); err != nil { return fmt.Errorf("tx.Commit: %w", err) } } - log.Info(fmt.Sprintf("[%s] Completed on", s.LogPrefix()), "block", stageProgress) + log.Info(fmt.Sprintf("[%s] Completed on", s.LogPrefix()), "block", blockExecutor.GetProgress()) return stoppedErr } -// returns the block's blockHash and header stateroot -func getBlockHashValues(cfg ExecuteBlockCfg, ctx context.Context, tx kv.RwTx, number uint64) (common.Hash, common.Hash, error) { - prevheaderHash, err := rawdb.ReadCanonicalHash(tx, number) - if err != nil { - return common.Hash{}, common.Hash{}, err - } - header, err := cfg.blockReader.Header(ctx, tx, prevheaderHash, number) - if err != nil { - return common.Hash{}, common.Hash{}, err - } - - return header.Root, prevheaderHash, nil -} - // returns calculated "to" block number for execution and the total blocks to be executed func getExecRange(cfg ExecuteBlockCfg, tx kv.RwTx, stageProgress, toBlock uint64, logPrefix string) (uint64, uint64, error) { if cfg.zk.DebugLimit > 0 { @@ -289,190 +245,14 @@ func getExecRange(cfg ExecuteBlockCfg, tx kv.RwTx, stageProgress, toBlock uint64 return to, total, nil } -// gets the pre-execute values for a block and sets the previous block hash -func getPreexecuteValues(cfg ExecuteBlockCfg, ctx context.Context, tx kv.RwTx, blockNum uint64, prevBlockHash common.Hash) (common.Hash, *types.Block, []common.Address, error) { - preExecuteHeaderHash, err := rawdb.ReadCanonicalHash(tx, blockNum) - if err != nil { - return common.Hash{}, nil, nil, fmt.Errorf("ReadCanonicalHash: %w", err) - } - - block, senders, err := cfg.blockReader.BlockWithSenders(ctx, tx, preExecuteHeaderHash, blockNum) - if err != nil { - return common.Hash{}, nil, nil, fmt.Errorf("BlockWithSenders: %w", err) - } - - if block == nil { - return common.Hash{}, nil, nil, fmt.Errorf("empty block blocknum: %d", blockNum) - } - - block.HeaderNoCopy().ParentHash = prevBlockHash - - if cfg.chainConfig.IsLondon(blockNum) { - parentHeader, err := cfg.blockReader.Header(ctx, tx, prevBlockHash, blockNum-1) - if err != nil { - return common.Hash{}, nil, nil, fmt.Errorf("cfg.blockReader.Header: %w", err) - } - block.HeaderNoCopy().BaseFee = misc.CalcBaseFeeZk(cfg.chainConfig, parentHeader) - } - - return preExecuteHeaderHash, block, senders, nil -} - -func postExecuteCommitValues( - logPrefix string, - cfg ExecuteBlockCfg, - tx kv.RwTx, - eridb *erigon_db.ErigonDb, - batch kv.PendingMutations, - datastreamBlockHash common.Hash, - block *types.Block, - senders []common.Address, -) error { - header := block.Header() - blockHash := header.Hash() - blockNum := block.NumberU64() - - // if datastream hash was wrong, remove old data - if blockHash != datastreamBlockHash { - if cfg.chainConfig.IsForkId9Elderberry2(blockNum) { - log.Warn(fmt.Sprintf("[%s] Blockhash mismatch", logPrefix), "blockNumber", blockNum, "datastreamBlockHash", datastreamBlockHash, "calculatedBlockHash", blockHash) - } - if err := rawdbZk.DeleteSenders(tx, datastreamBlockHash, blockNum); err != nil { - return fmt.Errorf("DeleteSenders: %w", err) - } - if err := rawdbZk.DeleteHeader(tx, datastreamBlockHash, blockNum); err != nil { - return fmt.Errorf("DeleteHeader: %w", err) - } - - bodyForStorage, err := rawdb.ReadBodyForStorageByKey(tx, dbutils.BlockBodyKey(blockNum, datastreamBlockHash)) - if err != nil { - return fmt.Errorf("ReadBodyForStorageByKey: %w", err) - } - - if err := rawdb.DeleteBodyAndTransactions(tx, blockNum, datastreamBlockHash); err != nil { - return fmt.Errorf("DeleteBodyAndTransactions: %w", err) - } - if err := rawdb.WriteBodyAndTransactions(tx, blockHash, blockNum, block.Transactions(), bodyForStorage); err != nil { - return fmt.Errorf("WriteBodyAndTransactions: %w", err) - } - - // [zkevm] senders were saved in stage_senders for headerHashes based on incomplete headers - // in stage execute we complete the headers and senders should be moved to the correct headerHash - // also we should delete other data based on the old hash, since it is unaccessable now - if err := rawdb.WriteSenders(tx, blockHash, blockNum, senders); err != nil { - return fmt.Errorf("failed to write senders: %w", err) - } - } - - // TODO: how can we store this data right first time? Or mop up old data as we're currently duping storage - /* - , \ / , - / \ )\__/( / \ - / \ (_\ /_) / \ - ____/_____\__\@ @/___/_____\____ - | |\../| | - | \VV/ | - | ZKEVM duping storage | - |_________________________________| - | /\ / \\ \ /\ | - | / V )) V \ | - |/ ` // ' \| - ` V ' - - we need to write the header back to the db at this point as the gas - used wasn't available from the data stream, or receipt hash, or bloom, so we're relying on execution to - provide it. We also need to update the canonical hash, so we can retrieve this newly updated header - later. - */ - if err := rawdb.WriteHeader_zkEvm(tx, header); err != nil { - return fmt.Errorf("WriteHeader_zkEvm: %w", err) - } - if err := rawdb.WriteHeadHeaderHash(tx, blockHash); err != nil { - return fmt.Errorf("WriteHeadHeaderHash: %w", err) - } - if err := rawdb.WriteCanonicalHash(tx, blockHash, blockNum); err != nil { - return fmt.Errorf("WriteCanonicalHash: %w", err) - } - // if err := eridb.WriteBody(block.Number(), blockHash, block.Transactions()); err != nil { - // return fmt.Errorf("failed to write body: %v", err) - // } - - // write the new block lookup entries - if err := rawdb.WriteTxLookupEntries_zkEvm(tx, block); err != nil { - return fmt.Errorf("WriteTxLookupEntries_zkEvm: %w", err) - } - - return nil -} - -func executeBlockZk( - block *types.Block, - prevBlockRoot *common.Hash, +func UnwindExecutionStageZk( + u *UnwindState, + s *StageState, tx kv.RwTx, - batch kv.StatelessRwTx, + ctx context.Context, cfg ExecuteBlockCfg, - vmConfig vm.Config, // emit copy, because will modify it - writeChangesets bool, - writeReceipts bool, - writeCallTraces bool, initialCycle bool, - stateStream bool, - roHermezDb state.ReadOnlyHermezDb, -) (execRs *core.EphemeralExecResultZk, err error) { - blockNum := block.NumberU64() - - stateReader, stateWriter, err := newStateReaderWriter(batch, tx, block, writeChangesets, cfg.accumulator, cfg.blockReader, stateStream) - if err != nil { - return nil, fmt.Errorf("newStateReaderWriter: %w", err) - } - - // where the magic happens - getHeader := func(hash common.Hash, number uint64) *types.Header { - h, _ := cfg.blockReader.Header(context.Background(), tx, hash, number) - return h - } - - getTracer := func(txIndex int, txHash common.Hash) (vm.EVMLogger, error) { - // return logger.NewJSONFileLogger(&logger.LogConfig{}, txHash.String()), nil - return logger.NewStructLogger(&logger.LogConfig{}), nil - } - - callTracer := calltracer.NewCallTracer() - vmConfig.Debug = true - vmConfig.Tracer = callTracer - - getHashFn := core.GetHashFn(block.Header(), getHeader) - if execRs, err = core.ExecuteBlockEphemerallyZk(cfg.chainConfig, &vmConfig, getHashFn, cfg.engine, block, stateReader, stateWriter, ChainReaderImpl{config: cfg.chainConfig, tx: tx, blockReader: cfg.blockReader}, getTracer, roHermezDb, prevBlockRoot); err != nil { - return nil, fmt.Errorf("ExecuteBlockEphemerallyZk: %w", err) - } - - if writeReceipts { - if err := rawdb.AppendReceipts(tx, blockNum, execRs.Receipts); err != nil { - return nil, fmt.Errorf("AppendReceipts: %w", err) - } - - stateSyncReceipt := execRs.StateSyncReceipt - if stateSyncReceipt != nil && stateSyncReceipt.Status == types.ReceiptStatusSuccessful { - if err := rawdb.WriteBorReceipt(tx, block.NumberU64(), stateSyncReceipt); err != nil { - return nil, fmt.Errorf("WriteBorReceipt: %w", err) - } - } - } - - if cfg.changeSetHook != nil { - if hasChangeSet, ok := stateWriter.(HasChangeSetWriter); ok { - cfg.changeSetHook(blockNum, hasChangeSet.ChangeSetWriter()) - } - } - if writeCallTraces { - if err := callTracer.WriteToDb(tx, block, *cfg.vmConfig); err != nil { - return nil, fmt.Errorf("WriteToDb: %w", err) - } - } - return execRs, nil -} - -func UnwindExecutionStageZk(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) { +) (err error) { if u.UnwindPoint >= s.BlockNumber { return nil } @@ -510,7 +290,15 @@ func UnwindExecutionStageZk(u *UnwindState, s *StageState, tx kv.RwTx, ctx conte return nil } -func UnwindExecutionStageErigon(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger) error { +func UnwindExecutionStageErigon( + u *UnwindState, + s *StageState, + tx kv.RwTx, + ctx context.Context, + cfg ExecuteBlockCfg, + initialCycle bool, + logger log.Logger, +) error { return unwindExecutionStage(u, s, wrap.TxContainer{Tx: tx}, ctx, cfg, initialCycle, logger) }