diff --git a/.gitignore b/.gitignore index ac0f4efdf..b62ff4c00 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,4 @@ profile.cov .vscode tests/spec-tests/ +bin/ \ No newline at end of file diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index f6dc1cf4b..c418d66d1 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -102,6 +102,10 @@ if one is set. Otherwise it prints the genesis from the datadir.`, utils.VMTraceJsonConfigFlag, utils.TransactionHistoryFlag, utils.StateHistoryFlag, + utils.BlockReplicationTargetsFlag, + utils.ReplicaEnableSpecimenFlag, + utils.ReplicaEnableResultFlag, + utils.ReplicaEnableBlobFlag, }, utils.DatabaseFlags), Description: ` The import command imports blocks from an RLP-encoded form. The form can be one file @@ -287,12 +291,15 @@ func importChain(ctx *cli.Context) error { // Start system runtime metrics collection go metrics.CollectProcessMetrics(3 * time.Second) - stack, _ := makeConfigNode(ctx) + stack, cfg := makeConfigNode(ctx) defer stack.Close() + replicators := utils.CreateReplicators(&cfg.Eth) chain, db := utils.MakeChain(ctx, stack, false) defer db.Close() + utils.AttachReplicators(replicators, chain) + // Start periodically gathering memory profiles var peakMemAlloc, peakMemSys atomic.Uint64 go func() { @@ -327,6 +334,7 @@ func importChain(ctx *cli.Context) error { } } chain.Stop() + utils.DrainReplicators(replicators) fmt.Printf("Import done in %v.\n\n", time.Since(start)) // Output pre-compaction stats mostly to see the import trashing diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 17ed9fb60..de4f67dba 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/eth/catalyst" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/internal/version" "github.com/ethereum/go-ethereum/log" @@ -181,7 +182,7 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) { } // makeFullNode loads geth configuration and creates the Ethereum backend. -func makeFullNode(ctx *cli.Context) *node.Node { +func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { stack, cfg := makeConfigNode(ctx) if ctx.IsSet(utils.OverrideCancun.Name) { v := ctx.Uint64(utils.OverrideCancun.Name) @@ -250,7 +251,7 @@ func makeFullNode(ctx *cli.Context) *node.Node { utils.Fatalf("failed to register catalyst service: %v", err) } } - return stack + return stack, backend } // dumpConfig is the dumpconfig command. diff --git a/cmd/geth/consolecmd.go b/cmd/geth/consolecmd.go index bf38c8634..44598206a 100644 --- a/cmd/geth/consolecmd.go +++ b/cmd/geth/consolecmd.go @@ -70,8 +70,8 @@ JavaScript API. See https://geth.ethereum.org/docs/interacting-with-geth/javascr func localConsole(ctx *cli.Context) error { // Create and start the node based on the CLI flags prepare(ctx) - stack := makeFullNode(ctx) - startNode(ctx, stack, true) + stack, backend := makeFullNode(ctx) + startNode(ctx, stack, true, backend) defer stack.Close() // Attach to the newly started node and create the JavaScript console. diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 1527e180f..3da3e7b70 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/internal/debug" + "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -155,6 +156,10 @@ var ( utils.BeaconGenesisRootFlag, utils.BeaconGenesisTimeFlag, utils.BeaconCheckpointFlag, + utils.BlockReplicationTargetsFlag, + utils.ReplicaEnableResultFlag, + utils.ReplicaEnableSpecimenFlag, + utils.ReplicaEnableBlobFlag, }, utils.NetworkFlags, utils.DatabaseFlags) rpcFlags = []cli.Flag{ @@ -342,17 +347,17 @@ func geth(ctx *cli.Context) error { } prepare(ctx) - stack := makeFullNode(ctx) + stack, backend := makeFullNode(ctx) defer stack.Close() - startNode(ctx, stack, false) + startNode(ctx, stack, false, backend) stack.Wait() return nil } // startNode boots up the system node and all registered protocols, after which // it starts the RPC/IPC interfaces and the miner. -func startNode(ctx *cli.Context, stack *node.Node, isConsole bool) { +func startNode(ctx *cli.Context, stack *node.Node, isConsole bool, backend ethapi.Backend) { // Start up the node itself utils.StartNode(ctx, stack, isConsole) @@ -401,6 +406,38 @@ func startNode(ctx *cli.Context, stack *node.Node, isConsole bool) { } }() + // Kill bsp-geth if --syncmode flag is 'light' + if ctx.String(utils.BlockReplicationTargetsFlag.Name) != "" && ctx.String(utils.SyncModeFlag.Name) == "light" { + utils.Fatalf("Block specimen production not supported for 'light' sync (only supported modes are 'snap' and 'full'") + } + + // Spawn a standalone goroutine for status synchronization monitoring, + // if full sync is completed in block specimen creation mode set replica config flag + if ctx.Bool(utils.ReplicaEnableSpecimenFlag.Name) || ctx.Bool(utils.ReplicaEnableResultFlag.Name) { + //log.Info("Synchronisation started, historical blocks synced set to 0") + backend.SetHistoricalBlocksSynced() + + go func() { + sub := stack.EventMux().Subscribe(downloader.DoneEvent{}) + defer sub.Unsubscribe() + for { + event := <-sub.Chan() + if event == nil { + continue + } + done, ok := event.Data.(downloader.DoneEvent) + if !ok { + continue + } + if timestamp := time.Unix(int64(done.Latest.Time), 0); time.Since(timestamp) < 10*time.Minute { + log.Info("Synchronisation completed, setting historical blocks synced to 1", "latestnum", done.Latest.Number, "latesthash", done.Latest.Hash(), + "age", common.PrettyAge(timestamp)) + backend.SetHistoricalBlocksSynced() + } + } + }() + } + // Spawn a standalone goroutine for status synchronization monitoring, // close the node when synchronization is complete if user required. if ctx.Bool(utils.ExitWhenSyncedFlag.Name) { diff --git a/cmd/geth/misccmd.go b/cmd/geth/misccmd.go index 2d31f3abe..64d1dbddf 100644 --- a/cmd/geth/misccmd.go +++ b/cmd/geth/misccmd.go @@ -23,6 +23,7 @@ import ( "strings" "github.com/ethereum/go-ethereum/internal/version" + "github.com/ethereum/go-ethereum/params" "github.com/urfave/cli/v2" ) @@ -73,6 +74,7 @@ func printVersion(ctx *cli.Context) error { fmt.Println(strings.Title(clientIdentifier)) fmt.Println("Version:", version.WithMeta) + fmt.Println("Bsp Version:", params.BspVersion) if git.Commit != "" { fmt.Println("Git Commit:", git.Commit) } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e635dd89c..4cef2f057 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -28,6 +28,7 @@ import ( "math/big" "net" "net/http" + "net/url" "os" "path/filepath" godebug "runtime/debug" @@ -723,6 +724,23 @@ var ( Value: node.DefaultConfig.BatchResponseMaxSize, Category: flags.APICategory, } + BlockReplicationTargetsFlag = &cli.StringFlag{ + Name: "replication.targets", + Usage: "Comma separated URLs for message-queue delivery of block specimens", + Value: "", + } + ReplicaEnableSpecimenFlag = &cli.BoolFlag{ + Name: "replica.specimen", + Usage: "Enables export of fields that comprise a block-specimen", + } + ReplicaEnableResultFlag = &cli.BoolFlag{ + Name: "replica.result", + Usage: "Enables export of fields that comprise a block-result", + } + ReplicaEnableBlobFlag = &cli.BoolFlag{ + Name: "replica.blob", + Usage: "Enables export of fields that comprise a block-blob", + } // Network Settings MaxPeersFlag = &cli.IntFlag{ @@ -1584,7 +1602,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { setMiner(ctx, &cfg.Miner) setRequiredBlocks(ctx, cfg) setLes(ctx, cfg) - + if ctx.IsSet(BlockReplicationTargetsFlag.Name) { + setBlockReplicationTargets(ctx, cfg) + } // Cap the cache allowance and tune the garbage collector mem, err := gopsutil.VirtualMemory() if err == nil { @@ -2245,3 +2265,63 @@ func MakeTrieDatabase(ctx *cli.Context, disk ethdb.Database, preimage bool, read } return triedb.NewDatabase(disk, config) } + +// setBlockResultTargets creates a list of replication targets from the command line flags. +func setBlockReplicationTargets(ctx *cli.Context, cfg *eth.Config) { + var urls []string + + if ctx.IsSet(BlockReplicationTargetsFlag.Name) { + urls = strings.Split(ctx.String(BlockReplicationTargetsFlag.Name), ",") + } + + cfg.BlockReplicationTargets = make([]string, 0, len(urls)) + for _, urlStr := range urls { + if urlStr != "" { + _, err := url.Parse(urlStr) + if err != nil { + log.Crit("Replication-target URL invalid", "url", urlStr, "err", err) + os.Exit(1) + } + cfg.BlockReplicationTargets = append(cfg.BlockReplicationTargets, urlStr) + } + } + if ctx.IsSet(ReplicaEnableResultFlag.Name) || ctx.IsSet(ReplicaEnableSpecimenFlag.Name) { + if ctx.Bool(ReplicaEnableSpecimenFlag.Name) { + cfg.ReplicaEnableSpecimen = true + } + if ctx.Bool(ReplicaEnableResultFlag.Name) { + cfg.ReplicaEnableResult = true + } + if ctx.Bool(ReplicaEnableBlobFlag.Name) { + cfg.ReplicaEnableBlob = true + } + } else { + Fatalf("--replication.targets flag is invalid without --replica.specimen and/or --replica.result, ONLY ADD --replica.blob with both replica.specimen AND replica.result flags for complete unified state capture)") + } +} + +func CreateReplicators(config *eth.Config) []*core.ChainReplicator { + replicators := make([]*core.ChainReplicator, 0) + + for _, blockReplicationTargets := range config.BlockReplicationTargets { + blockRepl, err := eth.CreateReplicator(blockReplicationTargets) + if err != nil { + Fatalf("Can't create replication target: %v", err) + } + replicators = append(replicators, blockRepl) + } + + return replicators +} + +func AttachReplicators(replicators []*core.ChainReplicator, chain *core.BlockChain) { + for _, replicator := range replicators { + replicator.Start(chain, chain.ReplicaConfig) + } +} + +func DrainReplicators(replicators []*core.ChainReplicator) { + for _, replicator := range replicators { + replicator.Stop() + } +} diff --git a/core/block_replica.go b/core/block_replica.go new file mode 100644 index 000000000..be8845a3d --- /dev/null +++ b/core/block_replica.go @@ -0,0 +1,207 @@ +package core + +import ( + "bytes" + "fmt" + "math/big" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" +) + +type BlockReplicationEvent struct { + Hash string + Data []byte +} + +func (bc *BlockChain) createBlockReplica(block *types.Block, replicaConfig *ReplicaConfig, chainConfig *params.ChainConfig, stateSpecimen *types.StateSpecimen) error { + + // blobs + var blobTxSidecars []*types.BlobTxSidecar + // if replicaConfig.EnableBlob { + // for sidecarData := range types.BlobTxSidecarChan { + // if sidecarData.BlockNumber.Uint64() == block.NumberU64() { + // log.Info("Consuming BlobTxSidecar Match From Chain Sync Channel", "Block Number:", sidecarData.BlockNumber.Uint64()) + // blobTxSidecars = append(blobTxSidecars, sidecarData.Blobs) + // } else { + // log.Info("Failing BlobTxSidecar Match from Chain Sync Channel", "Block Number:", sidecarData.BlockNumber.Uint64()) + // } + // log.Info("BlobTxSidecar Header", "Block Number:", sidecarData.BlockNumber.Uint64()) + // log.Info("Chain Sync Sidecar Channel", "Length:", len(types.BlobTxSidecarChan)) + // } + // } + //block replica with blobs + exportBlockReplica, err := bc.createReplica(block, replicaConfig, chainConfig, stateSpecimen, blobTxSidecars) + if err != nil { + return err + } + //encode to rlp + blockReplicaRLP, err := rlp.EncodeToBytes(exportBlockReplica) + if err != nil { + log.Error("error encoding block replica rlp", "error", err) + return err + } + + sHash := block.Hash().String() + + if atomic.LoadUint32(replicaConfig.HistoricalBlocksSynced) == 0 { + log.Info("BSP running in Live mode", "Unexported block ", block.NumberU64(), "hash", sHash) + return nil + } else if atomic.LoadUint32(replicaConfig.HistoricalBlocksSynced) == 1 { + log.Info("Creating Block Specimen", "Exported block", block.NumberU64(), "hash", sHash) + bc.blockReplicationFeed.Send(BlockReplicationEvent{ + sHash, + blockReplicaRLP, + }) + return nil + } else { + return fmt.Errorf("error in setting atomic config historical block sync: %v", replicaConfig.HistoricalBlocksSynced) + } +} + +func (bc *BlockChain) createReplica(block *types.Block, replicaConfig *ReplicaConfig, chainConfig *params.ChainConfig, stateSpecimen *types.StateSpecimen, blobSpecimen []*types.BlobTxSidecar) (*types.ExportBlockReplica, error) { + log.Info("Creating Block Replica", "Block Number:", block.NumberU64(), "Block Hash:", block.Hash().String()) + bHash := block.Hash() + bNum := block.NumberU64() + + //totalDifficulty + tdRLP := rawdb.ReadTdRLP(bc.db, bHash, bNum) + td := new(big.Int) + if err := rlp.Decode(bytes.NewReader(tdRLP), td); err != nil { + log.Error("Invalid block total difficulty RLP ", "hash ", bHash, "err", err) + return nil, err + } + + //header + headerRLP := rawdb.ReadHeaderRLP(bc.db, bHash, bNum) + header := new(types.Header) + if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil { + log.Error("Invalid block header RLP ", "hash ", bHash, "err ", err) + return nil, err + } + + //transactions + txsExp := make([]*types.TransactionForExport, len(block.Transactions())) + txsRlp := make([]*types.TransactionExportRLP, len(block.Transactions())) + for i, tx := range block.Transactions() { + txsExp[i] = (*types.TransactionForExport)(tx) + txsRlp[i] = txsExp[i].ExportTx(chainConfig, block.Number(), header.BaseFee, header.Time) + if !replicaConfig.EnableSpecimen { + txsRlp[i].V, txsRlp[i].R, txsRlp[i].S = nil, nil, nil + } + } + + // withdrawals + var withdrawalsRlp []*types.WithdrawalExportRLP = nil + if chainConfig.IsShanghai(block.Number(), block.Time()) { + withdrawalsExp := make([]*types.WithdrawalForExport, len(block.Withdrawals())) + withdrawalsRlp = make([]*types.WithdrawalExportRLP, len(block.Withdrawals())) + for i, withdrawal := range block.Withdrawals() { + withdrawalsExp[i] = (*types.WithdrawalForExport)(withdrawal) + withdrawalsRlp[i] = withdrawalsExp[i].ExportWithdrawal() + } + } + + //receipts + receipts := rawdb.ReadRawReceipts(bc.db, bHash, bNum) + receiptsExp := make([]*types.ReceiptForExport, len(receipts)) + receiptsRlp := make([]*types.ReceiptExportRLP, len(receipts)) + for i, receipt := range receipts { + receiptsExp[i] = (*types.ReceiptForExport)(receipt) + receiptsRlp[i] = receiptsExp[i].ExportReceipt() + } + + //senders + signer := types.MakeSigner(bc.chainConfig, block.Number(), block.Time()) + senders := make([]common.Address, 0, len(block.Transactions())) + for _, tx := range block.Transactions() { + sender, err := types.Sender(signer, tx) + if err != nil { + return nil, err + } else { + senders = append(senders, sender) + } + } + + //uncles + uncles := block.Uncles() + + //block replica export + if replicaConfig.EnableSpecimen && replicaConfig.EnableResult && replicaConfig.EnableBlob { + exportBlockReplica := &types.ExportBlockReplica{ + Type: "block-replica", + NetworkId: chainConfig.ChainID.Uint64(), + Hash: bHash, + TotalDiff: td, + Header: header, + Transactions: txsRlp, + Uncles: uncles, + Receipts: receiptsRlp, + Senders: senders, + State: stateSpecimen, + Withdrawals: withdrawalsRlp, + BlobTxSidecars: []*types.BlobTxSidecar{}, + } + log.Debug("Exporting full block-replica with blob-specimen") + return exportBlockReplica, nil + } else if replicaConfig.EnableSpecimen && !replicaConfig.EnableResult { + exportBlockReplica := &types.ExportBlockReplica{ + Type: "block-specimen", + NetworkId: chainConfig.ChainID.Uint64(), + Hash: bHash, + TotalDiff: td, + Header: header, + Transactions: txsRlp, + Uncles: uncles, + Receipts: []*types.ReceiptExportRLP{}, + Senders: senders, + State: stateSpecimen, + Withdrawals: withdrawalsRlp, + BlobTxSidecars: []*types.BlobTxSidecar{}, + } + log.Debug("Exporting block-specimen only (no blob specimens)") + return exportBlockReplica, nil + } else if !replicaConfig.EnableSpecimen && replicaConfig.EnableResult { + exportBlockReplica := &types.ExportBlockReplica{ + Type: "block-result", + NetworkId: chainConfig.ChainID.Uint64(), + Hash: bHash, + TotalDiff: td, + Header: header, + Transactions: txsRlp, + Uncles: uncles, + Receipts: receiptsRlp, + Senders: senders, + State: &types.StateSpecimen{}, + BlobTxSidecars: []*types.BlobTxSidecar{}, + } + log.Debug("Exporting block-result only (no blob specimens)") + return exportBlockReplica, nil + } else { + return nil, fmt.Errorf("--replication.targets flag is invalid without --replica.specimen and/or --replica.result, ADD --replica.blob with both replica.specimen AND replica.result flags for complete unified state capture aka block-replica)") + } +} + +// SubscribeChainReplicationEvent registers a subscription of ChainReplicationEvent. +func (bc *BlockChain) SubscribeBlockReplicationEvent(ch chan<- BlockReplicationEvent) event.Subscription { + return bc.scope.Track(bc.blockReplicationFeed.Subscribe(ch)) +} + +func (bc *BlockChain) SetBlockReplicaExports(replicaConfig *ReplicaConfig) bool { + if replicaConfig.EnableResult { + bc.ReplicaConfig.EnableResult = true + } + if replicaConfig.EnableSpecimen { + bc.ReplicaConfig.EnableSpecimen = true + } + if replicaConfig.EnableBlob { + bc.ReplicaConfig.EnableBlob = true + } + return true +} diff --git a/core/blockchain.go b/core/blockchain.go index c3da61b28..63c5cee53 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -259,6 +259,16 @@ type BlockChain struct { processor Processor // Block transaction processor interface vmConfig vm.Config logger *tracing.Hooks + + blockReplicationFeed event.Feed + ReplicaConfig *ReplicaConfig +} + +type ReplicaConfig struct { + EnableSpecimen bool + EnableResult bool + EnableBlob bool + HistoricalBlocksSynced *uint32 } // NewBlockChain returns a fully initialised block chain using information @@ -287,21 +297,28 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis log.Info("") bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triedb: triedb, - triegc: prque.New[int64, common.Hash](nil), - quit: make(chan struct{}), - chainmu: syncx.NewClosableMutex(), - bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), - bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), - receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), - blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), - txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), - engine: engine, - vmConfig: vmConfig, - logger: vmConfig.Tracer, + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triedb: triedb, + triegc: prque.New[int64, common.Hash](nil), + quit: make(chan struct{}), + chainmu: syncx.NewClosableMutex(), + bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), + bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), + receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), + blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), + txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), + engine: engine, + vmConfig: vmConfig, + logger: vmConfig.Tracer, + blockReplicationFeed: event.Feed{}, + ReplicaConfig: &ReplicaConfig{ + EnableSpecimen: false, + EnableResult: false, + EnableBlob: false, + HistoricalBlocksSynced: new(uint32), // Always set 0 for historical mode at start + }, } var err error bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) @@ -1774,7 +1791,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness if err != nil { return nil, it.index, err } - + // Enable prefetching to pull in trie node paths while processing transactions + statedb.EnableStateSpecimenTracking() // If we are past Byzantium, enable prefetching to pull in trie node paths // while processing transactions. Before Byzantium the prefetcher is mostly // useless due to the intermediate root hashing after each transaction. @@ -1831,6 +1849,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead) if !setHead { + // Export Block Specimen + if bc.ReplicaConfig.EnableSpecimen || bc.ReplicaConfig.EnableResult { + bc.createBlockReplica(block, bc.ReplicaConfig, bc.chainConfig, statedb.TakeStateSpecimen()) + } // After merge we expect few side chains. Simply count // all blocks the CL gives us for GC processing time bc.gcproc += res.procTime @@ -1842,7 +1864,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(start)), "root", block.Root()) - + // Handle creation of block specimen for canonical blocks + if bc.ReplicaConfig.EnableSpecimen || bc.ReplicaConfig.EnableResult { + bc.createBlockReplica(block, bc.ReplicaConfig, bc.chainConfig, statedb.TakeStateSpecimen()) + } lastCanon = block // Only count canonical blocks for GC processing time @@ -1853,6 +1878,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), "root", block.Root()) + // Currently proof-chain is not handling the forked block use case hence commented out + //bc.createBlockReplica(block, bc.ReplicaConfig, bc.chainConfig, statedb.TakeStateSpecimen()) default: // This in theory is impossible, but lets be nice to our future selves and leave @@ -1862,6 +1889,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), "root", block.Root()) } + // Is impossible but keeping in line to be nice to our future selves we add this for now + if bc.ReplicaConfig.EnableSpecimen || bc.ReplicaConfig.EnableResult { + bc.createBlockReplica(block, bc.ReplicaConfig, bc.chainConfig, statedb.TakeStateSpecimen()) + } } stats.ignored += it.remaining() return witness, it.index, err diff --git a/core/chain_replication.go b/core/chain_replication.go new file mode 100644 index 000000000..a4072393f --- /dev/null +++ b/core/chain_replication.go @@ -0,0 +1,326 @@ +package core + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" +) + +type ChainReplicationBackend interface { + Process(ctx context.Context, events []*BlockReplicationEvent) error + String() string +} + +// ChainReplicationChain interface is used for connecting the replicator to a blockchain +type ChainReplicatorChain interface { + // SubscribeChainReplicationEvent subscribes to new replication notifications. + SubscribeBlockReplicationEvent(ch chan<- BlockReplicationEvent) event.Subscription + // Set Block replica export types + SetBlockReplicaExports(replicaConfig *ReplicaConfig) bool +} + +type ChainReplicator struct { + sessionId uint64 + + backend ChainReplicationBackend + + mode uint32 + modeLock sync.Mutex + drain chan struct{} + exitStatus chan error + ctx context.Context + ctxCancel func() + + log *replicationLogger +} + +var replicationSessionSeq uint64 + +func NewChainReplicator(backend ChainReplicationBackend) *ChainReplicator { + sessionId := atomic.AddUint64(&replicationSessionSeq, 1) + + c := &ChainReplicator{ + sessionId: sessionId, + backend: backend, + drain: make(chan struct{}), + exitStatus: make(chan error), + log: &replicationLogger{log: log.New("sessID", sessionId)}, + } + + c.ctx, c.ctxCancel = context.WithCancel(context.Background()) + + return c +} + +const ( + modeNotStarted uint32 = iota + modeStarting + modeRunning + modeStopping +) + +func (c *ChainReplicator) Start(chain ChainReplicatorChain, replicaConfig *ReplicaConfig) { + c.modeLock.Lock() + defer c.modeLock.Unlock() + + if !atomic.CompareAndSwapUint32(&c.mode, modeNotStarted, modeStarting) { + return + } + + c.log.Info("Replication began", "backend", c.backend.String()) + + bSEvents := make(chan BlockReplicationEvent, 1000) + bSSub := chain.SubscribeBlockReplicationEvent(bSEvents) + _ = chain.SetBlockReplicaExports(replicaConfig) + go c.eventLoop(bSEvents, bSSub) +} + +func (c *ChainReplicator) Stop() (err error) { + c.modeLock.Lock() + defer c.modeLock.Unlock() + + if !atomic.CompareAndSwapUint32(&c.mode, modeRunning, modeStopping) { + return + } + + close(c.drain) + err = <-c.exitStatus + atomic.StoreUint32(&c.mode, modeNotStarted) + + return +} + +func (c *ChainReplicator) CloseImmediate() (err error) { + if atomic.LoadUint32(&c.mode) == modeStopping { + // Stop() or another CloseImmediate() is already holding the lock, + // so just hurry it along (idempotently) + c.ctxCancel() + + // wait for the other task to finish + c.modeLock.Lock() + defer c.modeLock.Unlock() + + return + } + + c.modeLock.Lock() + defer c.modeLock.Unlock() + + if !atomic.CompareAndSwapUint32(&c.mode, modeRunning, modeStopping) { + return + } + + c.ctxCancel() + err = <-c.exitStatus + atomic.StoreUint32(&c.mode, modeNotStarted) + + return +} + +var ( + errComplete = errors.New("replication complete") + errDraining = errors.New("draining") + errUnsubscribed = errors.New("unsubscribed") + errContextDone = errors.New("context completed") +) + +func (c *ChainReplicator) eventLoop(events chan BlockReplicationEvent, sub event.Subscription) { + defer sub.Unsubscribe() + defer close(c.exitStatus) + defer c.log.nextSession() + + atomic.StoreUint32(&c.mode, modeRunning) + + var ( + draining bool + unsubbed bool + + flush bool + lastFlushTime = time.Now() + lastReportTime = lastFlushTime + + ticker = time.NewTicker(1 * time.Second) + eventBuf = make([]*BlockReplicationEvent, 0, 500) + + stateChange = make(chan error, 2) + loopDone = make(chan struct{}) + ) + + defer ticker.Stop() + defer close(loopDone) + + go func() { + select { + case err, ok := <-sub.Err(): + if ok { + stateChange <- err + } else { + stateChange <- errUnsubscribed + } + return + case <-loopDone: + return + } + }() + + go func() { + select { + case <-c.ctx.Done(): + stateChange <- errContextDone + return + case <-loopDone: + return + } + }() + + go func() { + select { + case <-c.drain: + stateChange <- errDraining + case <-loopDone: + return + } + }() + + for { + select { + case err := <-stateChange: + switch err { + case errComplete: + c.log.Info("Replication complete") + return + + case errDraining: + if !draining { + c.log.Info("Replication queue draining") + draining = true + } + + case errUnsubscribed: + if !unsubbed { + c.log.Debug("Replication producer unsubscribed") + unsubbed = true + flush = true + } + + case errContextDone: + c.log.Info("Replication interrupted") + return + + default: + // a real error, from the subscription producer + c.log.Warn("Replication failure", "err", err) + c.exitStatus <- err + return + } + + case ev, ok := <-events: + if ok { + eventBuf = append(eventBuf, &ev) + + if len(eventBuf) == 500 { + flush = true + } + } else { + stateChange <- errUnsubscribed + } + + case t := <-ticker.C: + if t.Sub(lastReportTime) >= (8 * time.Second) { + if len(eventBuf) > 0 || c.log.IsDirty() { + c.log.Info("Replication progress", "queued", len(eventBuf)) + } + lastReportTime = t + } + + if len(eventBuf) > 0 && t.Sub(lastFlushTime) >= (3*time.Second) { + flush = true + } else if draining && len(eventBuf) == 0 && t.Sub(lastFlushTime) >= (2*time.Second) { + c.log.Info("Replication complete (queue drained)") + return + } + } + + if flush { + if len(eventBuf) > 0 { + if err := c.backend.Process(c.ctx, eventBuf); err != nil { + stateChange <- err + } else { + c.log.sent(eventBuf) + c.log.Debug("Replication segment", "len", len(eventBuf)) + } + } + + flush = false + lastFlushTime = time.Now() + eventBuf = eventBuf[:0] + + if unsubbed { + stateChange <- errComplete + unsubbed = false + } + } + } +} + +type replicationLogger struct { + log log.Logger + flushedCount uint64 + hashValid bool + lastHash string + dirty bool +} + +func (rl *replicationLogger) IsDirty() bool { + return rl.dirty +} + +func (rl *replicationLogger) appendMetrics(input []interface{}) []interface{} { + rl.dirty = false + + if rl.hashValid { + return append(input, "sent", rl.flushedCount, "last", rl.lastHash) + } else { + return append(input, "sent", rl.flushedCount) + } +} + +func (rl *replicationLogger) nextSession() { + rl.hashValid = false + rl.dirty = false +} + +func (rl *replicationLogger) sent(eventBuf []*BlockReplicationEvent) { + if len(eventBuf) == 0 { + return + } + + rl.flushedCount += uint64(len(eventBuf)) + rl.lastHash = eventBuf[len(eventBuf)-1].Hash + rl.hashValid = true + rl.dirty = true +} + +func (rl *replicationLogger) Trace(slug string, ctx ...interface{}) { + rl.log.Trace(slug, rl.appendMetrics(ctx)...) +} +func (rl *replicationLogger) Debug(slug string, ctx ...interface{}) { + rl.log.Debug(slug, rl.appendMetrics(ctx)...) +} +func (rl *replicationLogger) Info(slug string, ctx ...interface{}) { + rl.log.Info(slug, rl.appendMetrics(ctx)...) +} +func (rl *replicationLogger) Warn(slug string, ctx ...interface{}) { + rl.log.Warn(slug, rl.appendMetrics(ctx)...) +} +func (rl *replicationLogger) Error(slug string, ctx ...interface{}) { + rl.log.Error(slug, rl.appendMetrics(ctx)...) +} +func (rl *replicationLogger) Crit(slug string, ctx ...interface{}) { + rl.log.Crit(slug, rl.appendMetrics(ctx)...) +} diff --git a/core/state/state_object.go b/core/state/state_object.go index b659bf7ff..39b662490 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -202,6 +202,9 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash { log.Error("Failed to prefetch storage slot", "addr", s.address, "key", key, "err", err) } } + if sS := s.db.stateSpecimen; sS != nil { + sS.LogStorageRead(s.address, key, value) + } s.originStorage[key] = value return value } @@ -514,6 +517,9 @@ func (s *stateObject) Code() []byte { if err != nil { s.db.setError(fmt.Errorf("can't load code hash %x: %v", s.CodeHash(), err)) } + if sS := s.db.stateSpecimen; sS != nil { + sS.LogCodeRead(s.CodeHash(), code) + } s.code = code return code } diff --git a/core/state/statedb.go b/core/state/statedb.go index d855e5626..07899d49f 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -155,6 +155,9 @@ type StateDB struct { StorageLoaded int // Number of storage slots retrieved from the database during the state transition StorageUpdated atomic.Int64 // Number of storage slots updated during the state transition StorageDeleted atomic.Int64 // Number of storage slots deleted during the state transition + + // Log of state data read from backing DB + stateSpecimen *types.StateSpecimen } // New creates a new state from a given trie. @@ -180,10 +183,12 @@ func New(root common.Hash, db Database) (*StateDB, error) { journal: newJournal(), accessList: newAccessList(), transientStorage: newTransientStorage(), + stateSpecimen: types.NewStateSpecimen(), } if db.TrieDB().IsVerkle() { sdb.accessEvents = NewAccessEvents(db.PointCache()) } + return sdb, nil } @@ -596,10 +601,11 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject { log.Error("Failed to prefetch account", "addr", addr, "err", err) } } + s.stateSpecimen.LogAccountRead(addr, acct.Nonce, acct.Balance.ToBig(), acct.CodeHash) + // Insert into the live set obj := newObject(s, addr, acct) s.setStateObject(obj) - s.AccountLoaded++ return obj } @@ -675,6 +681,7 @@ func (s *StateDB) Copy() *StateDB { accessList: s.accessList.Copy(), transientStorage: s.transientStorage.Copy(), journal: s.journal.copy(), + stateSpecimen: types.NewStateSpecimen(), } if s.witness != nil { state.witness = s.witness.Copy() @@ -682,6 +689,9 @@ func (s *StateDB) Copy() *StateDB { if s.accessEvents != nil { state.accessEvents = s.accessEvents.Copy() } + if s.stateSpecimen != nil { + state.stateSpecimen = s.stateSpecimen.Copy() + } // Deep copy cached state objects. for addr, obj := range s.stateObjects { state.stateObjects[addr] = obj.deepCopy(state) @@ -1417,3 +1427,17 @@ func (s *StateDB) Witness() *stateless.Witness { func (s *StateDB) AccessEvents() *AccessEvents { return s.accessEvents } + +func (s *StateDB) EnableStateSpecimenTracking() { + s.stateSpecimen = types.NewStateSpecimen() +} + +func (s *StateDB) TakeStateSpecimen() *types.StateSpecimen { + sp := s.stateSpecimen + sp.BlockhashReadMap = make(map[uint64]common.Hash) + return sp +} + +func (s *StateDB) GetStateSpecimen() *types.StateSpecimen { + return s.stateSpecimen +} diff --git a/core/state/statedb_hooked.go b/core/state/statedb_hooked.go index 26d021709..f4186e4c9 100644 --- a/core/state/statedb_hooked.go +++ b/core/state/statedb_hooked.go @@ -264,3 +264,7 @@ func (s *hookedStateDB) Finalise(deleteEmptyObjects bool) { } } } + +func (s *hookedStateDB) GetStateSpecimen() *types.StateSpecimen { + return s.inner.GetStateSpecimen() +} diff --git a/core/types/block_export.go b/core/types/block_export.go new file mode 100644 index 000000000..06e90a9ca --- /dev/null +++ b/core/types/block_export.go @@ -0,0 +1,162 @@ +package types + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/params" +) + +type ExportBlockReplica struct { + Type string + NetworkId uint64 + Hash common.Hash + TotalDiff *big.Int + Header *Header + Transactions []*TransactionExportRLP + Uncles []*Header + Receipts []*ReceiptExportRLP + Senders []common.Address + State *StateSpecimen + Withdrawals []*WithdrawalExportRLP + BlobTxSidecars []*BlobTxSidecar +} + +type LogsExportRLP struct { + Address common.Address `json:"address"` + Topics []common.Hash `json:"topics"` + Data []byte `json:"data"` + BlockNumber uint64 `json:"blockNumber"` + TxHash common.Hash `json:"transactionHash"` + TxIndex uint `json:"transactionIndex"` + BlockHash common.Hash `json:"blockHash"` + Index uint `json:"logIndex"` + Removed bool `json:"removed"` +} + +type ReceiptForExport Receipt + +type ReceiptExportRLP struct { + PostStateOrStatus []byte + CumulativeGasUsed uint64 + TxHash common.Hash + ContractAddress common.Address + Logs []*LogsExportRLP + GasUsed uint64 +} + +type WithdrawalForExport Withdrawal + +type WithdrawalExportRLP struct { + Index uint64 `json:"index"` // monotonically increasing identifier issued by consensus layer + Validator uint64 `json:"validatorIndex"` // index of validator associated with withdrawal + Address common.Address `json:"address"` // target address for withdrawn ether + Amount uint64 `json:"amount"` // value of withdrawal in Gwei +} + +type TransactionForExport Transaction + +type TransactionExportRLP struct { + Type byte `json:"type"` + AccessList AccessList `json:"accessList"` + ChainId *big.Int `json:"chainId"` + AccountNonce uint64 `json:"nonce"` + Price *big.Int `json:"gasPrice"` + GasLimit uint64 `json:"gas"` + GasTipCap *big.Int `json:"gasTipCap"` + GasFeeCap *big.Int `json:"gasFeeCap"` + Sender *common.Address `json:"from" rlp:"nil"` + Recipient *common.Address `json:"to" rlp:"nil"` // nil means contract creation + Amount *big.Int `json:"value"` + Payload []byte `json:"input"` + V *big.Int `json:"v" rlp:"nil"` + R *big.Int `json:"r" rlp:"nil"` + S *big.Int `json:"s" rlp:"nil"` + BlobFeeCap *big.Int `json:"blobFeeCap" rlp:"optional"` + BlobHashes []common.Hash `json:"blobHashes" rlp:"optional"` + BlobGas uint64 `json:"blobGas" rlp:"optional"` +} + +type BlobTxSidecarData struct { + Blobs *BlobTxSidecar + BlockNumber *big.Int +} + +var BlobTxSidecarChan = make(chan *BlobTxSidecarData, 100) + +func (r *ReceiptForExport) ExportReceipt() *ReceiptExportRLP { + enc := &ReceiptExportRLP{ + PostStateOrStatus: (*Receipt)(r).statusEncoding(), + GasUsed: r.GasUsed, + CumulativeGasUsed: r.CumulativeGasUsed, + TxHash: r.TxHash, + ContractAddress: r.ContractAddress, + Logs: make([]*LogsExportRLP, len(r.Logs)), + } + for i, log := range r.Logs { + enc.Logs[i] = (*LogsExportRLP)(log) + } + return enc +} + +func (r *WithdrawalForExport) ExportWithdrawal() *WithdrawalExportRLP { + return &WithdrawalExportRLP{ + Index: r.Index, + Validator: r.Validator, + Address: r.Address, + Amount: r.Amount, + } +} + +func (tx *TransactionForExport) ExportTx(chainConfig *params.ChainConfig, blockNumber *big.Int, baseFee *big.Int, blockTime uint64) *TransactionExportRLP { + var inner_tx *Transaction = (*Transaction)(tx) + v, r, s := tx.inner.rawSignatureValues() + var signer Signer = MakeSigner(chainConfig, blockNumber, blockTime) + from, _ := Sender(signer, inner_tx) + + txData := tx.inner + + if inner_tx.Type() == BlobTxType { + return &TransactionExportRLP{ + AccountNonce: txData.nonce(), + Price: txData.effectiveGasPrice(&big.Int{}, baseFee), + GasLimit: txData.gas(), + Sender: &from, + Recipient: txData.to(), + Amount: txData.value(), + Payload: txData.data(), + Type: txData.txType(), + ChainId: txData.chainID(), + AccessList: txData.accessList(), + GasTipCap: txData.gasTipCap(), + GasFeeCap: txData.gasFeeCap(), + V: v, + R: r, + S: s, + BlobFeeCap: inner_tx.BlobGasFeeCap(), + BlobHashes: inner_tx.BlobHashes(), + BlobGas: inner_tx.BlobGas(), + } + } else { + return &TransactionExportRLP{ + AccountNonce: txData.nonce(), + Price: txData.effectiveGasPrice(&big.Int{}, baseFee), + GasLimit: txData.gas(), + Sender: &from, + Recipient: txData.to(), + Amount: txData.value(), + Payload: txData.data(), + Type: txData.txType(), + ChainId: txData.chainID(), + AccessList: txData.accessList(), + GasTipCap: txData.gasTipCap(), + GasFeeCap: txData.gasFeeCap(), + V: v, + R: r, + S: s, + BlobFeeCap: &big.Int{}, + BlobHashes: make([]common.Hash, 0), + BlobGas: 0, + } + } +} diff --git a/core/types/block_specimen.go b/core/types/block_specimen.go new file mode 100644 index 000000000..47587c80c --- /dev/null +++ b/core/types/block_specimen.go @@ -0,0 +1,110 @@ +package types + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +type StateSpecimen struct { + AccountRead []*accountRead + StorageRead []*storageRead + CodeRead []*codeRead + BlockhashRead []*blockhashRead + BlockhashReadMap map[uint64]common.Hash `json:"-" rlp:"-"` // ignore in encoding/decoding +} + +type accountRead struct { + Address common.Address + Nonce uint64 + Balance *big.Int + CodeHash common.Hash +} + +type storageRead struct { + Account common.Address + SlotKey common.Hash + Value common.Hash +} + +type codeRead struct { + Hash common.Hash + Code []byte +} + +type blockhashRead struct { + BlockNumber uint64 + BlockHash common.Hash +} + +func NewStateSpecimen() *StateSpecimen { + sp := &StateSpecimen{ + BlockhashReadMap: make(map[uint64]common.Hash), + } + return sp +} + +func (sp *StateSpecimen) Copy() *StateSpecimen { + cpy := StateSpecimen{ + AccountRead: make([]*accountRead, 0), + StorageRead: make([]*storageRead, 0), + CodeRead: make([]*codeRead, 0), + BlockhashRead: make([]*blockhashRead, 0), + BlockhashReadMap: make(map[uint64]common.Hash), + } + + return &cpy +} + +func (sp *StateSpecimen) LogAccountRead(addr common.Address, nonce uint64, balance *big.Int, codeHashB []byte) *StateSpecimen { + codeHash := common.BytesToHash(codeHashB) + log.Trace("Retrieved committed account", "addr", addr, "nonce", nonce, "balance", balance, "codeHash", codeHash) + + sp.AccountRead = append(sp.AccountRead, &accountRead{ + Address: addr, + Nonce: nonce, + Balance: balance, + CodeHash: codeHash, + }) + + return sp +} + +func (sp *StateSpecimen) LogStorageRead(account common.Address, slotKey common.Hash, value common.Hash) *StateSpecimen { + log.Trace("Retrieved committed storage", "account", account, "slotKey", slotKey, "value", value) + + sp.StorageRead = append(sp.StorageRead, &storageRead{ + Account: account, + SlotKey: slotKey, + Value: value, + }) + + return sp +} + +func (sp *StateSpecimen) LogCodeRead(hashB []byte, code []byte) *StateSpecimen { + hash := common.BytesToHash(hashB) + log.Trace("Retrieved code", "hash", hash, "len", len(code)) + + sp.CodeRead = append(sp.CodeRead, &codeRead{ + Hash: hash, + Code: code, + }) + + return sp +} + +func (sp *StateSpecimen) LogBlockhashRead(blockN uint64, blockHash common.Hash) *StateSpecimen { + log.Trace("Retrieved BlockHash", "block_number", blockN, "hash", blockHash) + + if _, ok := sp.BlockhashReadMap[blockN]; !ok { + sp.BlockhashReadMap[blockN] = blockHash + sp.BlockhashRead = append(sp.BlockhashRead, &blockhashRead{ + BlockNumber: blockN, + BlockHash: blockHash, + }) + } + + return sp +} diff --git a/core/vm/instructions.go b/core/vm/instructions.go index 47eb62be0..420d20265 100644 --- a/core/vm/instructions.go +++ b/core/vm/instructions.go @@ -443,8 +443,9 @@ func opBlockhash(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ( } else { lower = upper - 256 } + var res common.Hash if num64 >= lower && num64 < upper { - res := interpreter.evm.Context.GetHash(num64) + res = interpreter.evm.Context.GetHash(num64) if witness := interpreter.evm.StateDB.Witness(); witness != nil { witness.AddBlockHash(num64) } @@ -452,6 +453,9 @@ func opBlockhash(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ( } else { num.Clear() } + if specimen := interpreter.evm.StateDB.GetStateSpecimen(); specimen != nil { + specimen.LogBlockhashRead(num64, res) + } return nil, nil } diff --git a/core/vm/interface.go b/core/vm/interface.go index 9229f4d2c..9741e1697 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -98,6 +98,7 @@ type StateDB interface { // Finalise must be invoked at the end of a transaction Finalise(bool) + GetStateSpecimen() *types.StateSpecimen } // CallContext provides a basic interface for the EVM calling conventions. The EVM diff --git a/eth/api_backend.go b/eth/api_backend.go index 4e81d68e0..1a990f159 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -20,6 +20,7 @@ import ( "context" "errors" "math/big" + "sync/atomic" "time" "github.com/ethereum/go-ethereum" @@ -38,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -427,3 +429,18 @@ func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, re func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (*types.Transaction, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { return b.eth.stateAtTransaction(ctx, block, txIndex, reexec) } + +// SetHistoricalBlocksSynced returns a bool for BSP replica config (Historical mode :0 , Live mode: 1) +func (b *EthAPIBackend) SetHistoricalBlocksSynced() bool { + if b.eth.Synced() { + atomic.StoreUint32(b.eth.blockchain.ReplicaConfig.HistoricalBlocksSynced, 1) + log.Info("Fully Synced, BSP running in live sync mode", "BSP Mode Config: ", atomic.LoadUint32(b.eth.blockchain.ReplicaConfig.HistoricalBlocksSynced)) + return true + } else { + // log.Error("Not accepting new transactions, BSP running in historical sync mode", "BSP Mode Config: ", atomic.LoadUint32(b.eth.blockchain.ReplicaConfig.HistoricalBlocksSynced)) + // return false + atomic.StoreUint32(b.eth.blockchain.ReplicaConfig.HistoricalBlocksSynced, 1) + log.Info("Fully Synced, BSP running in live sync mode", "BSP Mode Config: ", atomic.LoadUint32(b.eth.blockchain.ReplicaConfig.HistoricalBlocksSynced)) + return true + } +} diff --git a/eth/backend.go b/eth/backend.go index ccfe650f4..12fdad25a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -97,7 +97,9 @@ type Ethereum struct { lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase) - shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully + shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully + blockReplicators []*core.ChainReplicator + ReplicaConfig *core.ReplicaConfig } // New creates a new Ethereum object (including the initialisation of the common Ethereum object), @@ -164,6 +166,21 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { p2pServer: stack.Server(), discmix: enode.NewFairMix(0), shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), + blockReplicators: make([]*core.ChainReplicator, 0), + ReplicaConfig: &core.ReplicaConfig{ + EnableSpecimen: config.ReplicaEnableSpecimen, + EnableResult: config.ReplicaEnableResult, + EnableBlob: config.ReplicaEnableBlob, + HistoricalBlocksSynced: new(uint32), // Always set 0 for historical mode at start + }, + } + for _, targets := range config.BlockReplicationTargets { + replicator, err := CreateReplicator(targets) + if err != nil { + return nil, err + } + log.Info("Block replication started", "targets", targets, "network ID", config.NetworkId, "export block-specimen", eth.ReplicaConfig.EnableSpecimen, "export block-result", eth.ReplicaConfig.EnableResult, "export blob-specimen", eth.ReplicaConfig.EnableBlob) + eth.blockReplicators = append(eth.blockReplicators, replicator) } bcVersion := rawdb.ReadDatabaseVersion(chainDb) var dbVer = "" @@ -222,7 +239,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } eth.bloomIndexer.Start(eth.blockchain) - + eth.blockchain.SetBlockReplicaExports(eth.ReplicaConfig) + for _, bRRepl := range eth.blockReplicators { + bRRepl.Start(eth.blockchain, eth.ReplicaConfig) + } if config.BlobPool.Datadir != "" { config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) } @@ -411,6 +431,9 @@ func (s *Ethereum) Stop() error { close(s.closeBloomHandler) s.txPool.Close() s.blockchain.Stop() + for _, repl := range s.blockReplicators { + repl.Stop() + } s.engine.Close() // Clean shutdown marker as the last thing before closing db diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index f1a815e6d..e3dc2ac6b 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -153,6 +153,14 @@ type Config struct { // OverrideVerkle (TODO: remove after the fork) OverrideVerkle *uint64 `toml:",omitempty"` + + // List of URIs to connect replication providers to + BlockReplicationTargets []string `toml:",omitempty"` + + // Bools that make explicit types being exported + ReplicaEnableResult bool + ReplicaEnableSpecimen bool + ReplicaEnableBlob bool } // CreateConsensusEngine creates a consensus engine for the given chain config. diff --git a/eth/redis_queue_replicator.go b/eth/redis_queue_replicator.go new file mode 100644 index 000000000..cfb97ba95 --- /dev/null +++ b/eth/redis_queue_replicator.go @@ -0,0 +1,69 @@ +package eth + +import ( + "context" + "errors" + "fmt" + "net/url" + + "github.com/ethereum/go-ethereum/core" + + "github.com/go-redis/redis/v7" + "github.com/golang/snappy" +) + +type RedisQueueReplicator struct { + rdb *redis.Client + qKey string + description string + compBuf []byte +} + +const redisQueueReplicatorCompBufSize = 20 * 1024 * 1024 + +func NewRedisQueueReplicator(rdbURL *url.URL) (*core.ChainReplicator, error) { + q := rdbURL.Query() + topic := q.Get("topic") + if len(topic) == 0 { + return nil, errors.New("redis replication target requires 'topic' query-param") + } + q.Del("topic") + rdbURL.RawQuery = q.Encode() + + rdbOpts, err := redis.ParseURL(rdbURL.String()) + if err != nil { + return nil, err + } + + backend := &RedisQueueReplicator{ + rdb: redis.NewClient(rdbOpts), + qKey: topic, + description: fmt.Sprintf("Redis(addr=%s,type=stream,key=%s)", rdbOpts.Addr, topic), + compBuf: make([]byte, 0, redisQueueReplicatorCompBufSize), + } + + return core.NewChainReplicator(backend), nil +} + +func (r *RedisQueueReplicator) String() string { + return r.description +} + +func (r *RedisQueueReplicator) Process(ctx context.Context, events []*core.BlockReplicationEvent) (err error) { + pipe := r.rdb.WithContext(ctx).Pipeline() + + for _, event := range events { + encodedData := snappy.Encode(nil, event.Data) + pipe.XAdd(&redis.XAddArgs{ + Stream: r.qKey, + MaxLenApprox: 500000, + Values: map[string]interface{}{ + "hash": event.Hash, + "data": encodedData, + }, + }) + } + + _, err = pipe.Exec() + return +} diff --git a/eth/replicators.go b/eth/replicators.go new file mode 100644 index 000000000..370d57a2a --- /dev/null +++ b/eth/replicators.go @@ -0,0 +1,24 @@ +package eth + +import ( + "fmt" + "net/url" + + "github.com/ethereum/go-ethereum/core" +) + +func CreateReplicator(target string) (*core.ChainReplicator, error) { + targetURL, err := url.Parse(target) + if err != nil { + return nil, err + } + + switch targetURL.Scheme { + case "redis", "rediss": + return NewRedisQueueReplicator(targetURL) + case "file": + return NewRLPFileSetReplicator(targetURL) + default: + return nil, fmt.Errorf("unknown replication-target URI scheme '%s'", targetURL.Scheme) + } +} diff --git a/eth/rlp_fileset_replicator.go b/eth/rlp_fileset_replicator.go new file mode 100644 index 000000000..faaf2849c --- /dev/null +++ b/eth/rlp_fileset_replicator.go @@ -0,0 +1,116 @@ +package eth + +import ( + "context" + "fmt" + "net/url" + "os" + "path" + "time" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/rlp" + + "github.com/golang/snappy" +) + +type RLPFileSetReplicator struct { + filesetBasePath string + nodeStartID uint64 + chunkSeq uint64 + chunkFile *os.File + chunkBytesWritten uint64 + description string + compBuf []byte +} + +const rlpFileReplicationChunkSizeLimit = 5 * 1024 * 1024 +const rlpFileReplicationCompBufSize = 20 * 1024 * 1024 + +func NewRLPFileSetReplicator(filesetBaseURI *url.URL) (*core.ChainReplicator, error) { + filesetBasePath := filesetBaseURI.Path + + err := os.MkdirAll(filesetBasePath, 0755) + if err != nil { + return nil, err + } + + nodeStartID := uint64(time.Now().Unix()) + + backend := &RLPFileSetReplicator{ + filesetBasePath: filesetBasePath, + nodeStartID: nodeStartID, + description: fmt.Sprintf("RLPFileSet(path=%s)", filesetBasePath), + compBuf: make([]byte, 0, rlpFileReplicationCompBufSize), + } + + err = backend.OpenNextChunk() + if err != nil { + return nil, err + } + + return core.NewChainReplicator(backend), nil +} + +func (r *RLPFileSetReplicator) OpenNextChunk() error { + if r.chunkFile != nil { + r.chunkFile.Close() + r.chunkSeq++ + } + + chunkFileName := fmt.Sprintf("sess-%08x-chunk-%08x.rlp", r.nodeStartID, r.chunkSeq) + chunkFilePath := path.Join(r.filesetBasePath, chunkFileName) + + f, err := os.Create(chunkFilePath) + if err != nil { + return err + } + + r.chunkFile = f + r.chunkBytesWritten = 0 + + return nil +} + +func (r *RLPFileSetReplicator) String() string { + return r.description +} + +func (r *RLPFileSetReplicator) Process(ctx context.Context, events []*core.BlockReplicationEvent) (err error) { + var ( + rlpData []byte + compRlpData []byte + bytesWrittenForStep uint64 + bytesWrittenForEvent int + ) + + for _, event := range events { + rlpData, err = rlp.EncodeToBytes([]interface{}{ + event.Hash, + event.Data, + }) + if err != nil { + return + } + + compRlpData = snappy.Encode(r.compBuf, rlpData) + + bytesWrittenForEvent, err = r.chunkFile.Write(compRlpData) + if err != nil { + return + } + bytesWrittenForStep += uint64(bytesWrittenForEvent) + } + + err = r.chunkFile.Sync() + if err != nil { + return + } + + r.chunkBytesWritten += bytesWrittenForStep + if r.chunkBytesWritten >= rlpFileReplicationChunkSizeLimit { + err = r.OpenNextChunk() + } + + return +} diff --git a/go.mod b/go.mod index c467cef16..e0f82af7b 100644 --- a/go.mod +++ b/go.mod @@ -105,6 +105,7 @@ require ( github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-ole/go-ole v1.3.0 // indirect + github.com/go-redis/redis/v7 v7.4.1 // indirect github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect diff --git a/go.sum b/go.sum index 19a7b9d25..a86ef2f5c 100644 --- a/go.sum +++ b/go.sum @@ -203,6 +203,8 @@ github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= +github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= +github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -411,9 +413,11 @@ github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= @@ -579,6 +583,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -641,6 +646,7 @@ golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index f570c5dc4..313830176 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -624,6 +624,7 @@ func (b testBackend) BloomStatus() (uint64, uint64) { panic("implement me") } func (b testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { panic("implement me") } +func (b testBackend) SetHistoricalBlocksSynced() bool { return true } func TestEstimateGas(t *testing.T) { t.Parallel() diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 82465ca7d..39d6fd9bc 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -95,6 +95,8 @@ type Backend interface { SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) + + SetHistoricalBlocksSynced() bool } func GetAPIs(apiBackend Backend) []rpc.API { diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 5f59b491e..d5477edca 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -402,3 +402,8 @@ func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) } func (b *backendMock) Engine() consensus.Engine { return nil } + +func (b *backendMock) SetHistoricalBlocksSynced(synced bool) { + // This is a mock implementation, so we can leave it empty + // or add test-specific logic if needed +} diff --git a/miner/worker.go b/miner/worker.go index db2fac387..ce94a2a29 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -43,6 +43,8 @@ var ( errBlockInterruptedByTimeout = errors.New("timeout while building block") ) +var enableBlobTxSidecar bool + // environment is the worker's current environment and holds all // information of the sealing block generation. type environment struct { @@ -98,6 +100,9 @@ func (miner *Miner) generateWork(params *generateParams, witness bool) *newPaylo if err != nil { return &newPayloadResult{err: err} } + if miner.chain.ReplicaConfig.EnableBlob { + enableBlobTxSidecar = true + } if !params.noTxs { interrupt := new(atomic.Int32) timer := time.AfterFunc(miner.config.Recommit, func() { @@ -140,6 +145,21 @@ func (miner *Miner) generateWork(params *generateParams, witness bool) *newPaylo reqHash := types.CalcRequestsHash(requests) work.header.RequestsHash = &reqHash } + if enableBlobTxSidecar { + work.sidecars = make([]*types.BlobTxSidecar, len(work.sidecars)) + copy(work.sidecars, work.sidecars) + types.BlobTxSidecarChan = make(chan *types.BlobTxSidecarData, 100) + go func() { + for sidecar := range work.sidecars { + types.BlobTxSidecarChan <- &types.BlobTxSidecarData{ + Blobs: work.sidecars[sidecar], + BlockNumber: work.header.Number, + } + } + log.Info("Closing Chain Sync BlobTxSidecar Channel For", "Block Number:", work.header.Number.Uint64(), "Length:", len(types.BlobTxSidecarChan)) + close(types.BlobTxSidecarChan) + }() + } block, err := miner.engine.FinalizeAndAssemble(miner.chain, work.header, work.state, &body, work.receipts) if err != nil { diff --git a/params/version.go b/params/version.go new file mode 100644 index 000000000..6dd4468e9 --- /dev/null +++ b/params/version.go @@ -0,0 +1,78 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package params + +import ( + "fmt" +) + +const ( + VersionMajor = 1 // Major version component of the current release + VersionMinor = 14 // Minor version component of the current release + VersionPatch = 12 // Patch version component of the current release + VersionMeta = "stable" // Version metadata to append to the version string +) + +const ( + BspVersionMajor = 1 // Major version component of the current release + BspVersionMinor = 9 // Minor version component of the current release + BspVersionPatch = 0 // Patch version component of the current release +) + +// Version holds the textual version string. +var Version = func() string { + return fmt.Sprintf("%d.%d.%d", VersionMajor, VersionMinor, VersionPatch) +}() + +// BspVersion holds the textual version string. +var BspVersion = func() string { + return fmt.Sprintf("%d.%d.%d-%v", BspVersionMajor, BspVersionMinor, BspVersionPatch, "bsp") +}() + +// VersionWithMeta holds the textual version string including the metadata. +var VersionWithMeta = func() string { + v := Version + if VersionMeta != "" { + v += "-" + VersionMeta + } + return v +}() + +// ArchiveVersion holds the textual version string used for Geth archives. e.g. +// "1.8.11-dea1ce05" for stable releases, or "1.8.13-unstable-21c059b6" for unstable +// releases. +func ArchiveVersion(gitCommit string) string { + vsn := Version + if VersionMeta != "stable" { + vsn += "-" + VersionMeta + } + if len(gitCommit) >= 8 { + vsn += "-" + gitCommit[:8] + } + return vsn +} + +func VersionWithCommit(gitCommit, gitDate string) string { + vsn := VersionWithMeta + if len(gitCommit) >= 8 { + vsn += "-" + gitCommit[:8] + } + if (VersionMeta != "stable") && (gitDate != "") { + vsn += "-" + gitDate + } + return vsn +}