Skip to content

Commit

Permalink
repatch state-specimen, block-specimen & blob-specimen producer
Browse files Browse the repository at this point in the history
Signed-off-by: Pranay Valson <[email protected]>
  • Loading branch information
noslav committed Jan 14, 2025
1 parent 293a300 commit f536e11
Show file tree
Hide file tree
Showing 30 changed files with 1,404 additions and 30 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ profile.cov
.vscode

tests/spec-tests/
bin/
10 changes: 9 additions & 1 deletion cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ if one is set. Otherwise it prints the genesis from the datadir.`,
utils.VMTraceJsonConfigFlag,
utils.TransactionHistoryFlag,
utils.StateHistoryFlag,
utils.BlockReplicationTargetsFlag,
utils.ReplicaEnableSpecimenFlag,
utils.ReplicaEnableResultFlag,
utils.ReplicaEnableBlobFlag,
}, utils.DatabaseFlags),
Description: `
The import command imports blocks from an RLP-encoded form. The form can be one file
Expand Down Expand Up @@ -287,12 +291,15 @@ func importChain(ctx *cli.Context) error {
// Start system runtime metrics collection
go metrics.CollectProcessMetrics(3 * time.Second)

stack, _ := makeConfigNode(ctx)
stack, cfg := makeConfigNode(ctx)
defer stack.Close()
replicators := utils.CreateReplicators(&cfg.Eth)

chain, db := utils.MakeChain(ctx, stack, false)
defer db.Close()

utils.AttachReplicators(replicators, chain)

// Start periodically gathering memory profiles
var peakMemAlloc, peakMemSys atomic.Uint64
go func() {
Expand Down Expand Up @@ -327,6 +334,7 @@ func importChain(ctx *cli.Context) error {
}
}
chain.Stop()
utils.DrainReplicators(replicators)
fmt.Printf("Import done in %v.\n\n", time.Since(start))

// Output pre-compaction stats mostly to see the import trashing
Expand Down
5 changes: 3 additions & 2 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/internal/version"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -181,7 +182,7 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
}

// makeFullNode loads geth configuration and creates the Ethereum backend.
func makeFullNode(ctx *cli.Context) *node.Node {
func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
stack, cfg := makeConfigNode(ctx)
if ctx.IsSet(utils.OverrideCancun.Name) {
v := ctx.Uint64(utils.OverrideCancun.Name)
Expand Down Expand Up @@ -250,7 +251,7 @@ func makeFullNode(ctx *cli.Context) *node.Node {
utils.Fatalf("failed to register catalyst service: %v", err)
}
}
return stack
return stack, backend
}

// dumpConfig is the dumpconfig command.
Expand Down
4 changes: 2 additions & 2 deletions cmd/geth/consolecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ JavaScript API. See https://geth.ethereum.org/docs/interacting-with-geth/javascr
func localConsole(ctx *cli.Context) error {
// Create and start the node based on the CLI flags
prepare(ctx)
stack := makeFullNode(ctx)
startNode(ctx, stack, true)
stack, backend := makeFullNode(ctx)
startNode(ctx, stack, true, backend)
defer stack.Close()

// Attach to the newly started node and create the JavaScript console.
Expand Down
43 changes: 40 additions & 3 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/internal/debug"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -155,6 +156,10 @@ var (
utils.BeaconGenesisRootFlag,
utils.BeaconGenesisTimeFlag,
utils.BeaconCheckpointFlag,
utils.BlockReplicationTargetsFlag,
utils.ReplicaEnableResultFlag,
utils.ReplicaEnableSpecimenFlag,
utils.ReplicaEnableBlobFlag,
}, utils.NetworkFlags, utils.DatabaseFlags)

rpcFlags = []cli.Flag{
Expand Down Expand Up @@ -342,17 +347,17 @@ func geth(ctx *cli.Context) error {
}

prepare(ctx)
stack := makeFullNode(ctx)
stack, backend := makeFullNode(ctx)
defer stack.Close()

startNode(ctx, stack, false)
startNode(ctx, stack, false, backend)
stack.Wait()
return nil
}

// startNode boots up the system node and all registered protocols, after which
// it starts the RPC/IPC interfaces and the miner.
func startNode(ctx *cli.Context, stack *node.Node, isConsole bool) {
func startNode(ctx *cli.Context, stack *node.Node, isConsole bool, backend ethapi.Backend) {
// Start up the node itself
utils.StartNode(ctx, stack, isConsole)

Expand Down Expand Up @@ -401,6 +406,38 @@ func startNode(ctx *cli.Context, stack *node.Node, isConsole bool) {
}
}()

// Kill bsp-geth if --syncmode flag is 'light'
if ctx.String(utils.BlockReplicationTargetsFlag.Name) != "" && ctx.String(utils.SyncModeFlag.Name) == "light" {
utils.Fatalf("Block specimen production not supported for 'light' sync (only supported modes are 'snap' and 'full'")
}

// Spawn a standalone goroutine for status synchronization monitoring,
// if full sync is completed in block specimen creation mode set replica config flag
if ctx.Bool(utils.ReplicaEnableSpecimenFlag.Name) || ctx.Bool(utils.ReplicaEnableResultFlag.Name) {
//log.Info("Synchronisation started, historical blocks synced set to 0")
backend.SetHistoricalBlocksSynced()

go func() {
sub := stack.EventMux().Subscribe(downloader.DoneEvent{})
defer sub.Unsubscribe()
for {
event := <-sub.Chan()
if event == nil {
continue
}
done, ok := event.Data.(downloader.DoneEvent)
if !ok {
continue
}
if timestamp := time.Unix(int64(done.Latest.Time), 0); time.Since(timestamp) < 10*time.Minute {
log.Info("Synchronisation completed, setting historical blocks synced to 1", "latestnum", done.Latest.Number, "latesthash", done.Latest.Hash(),
"age", common.PrettyAge(timestamp))
backend.SetHistoricalBlocksSynced()
}
}
}()
}

// Spawn a standalone goroutine for status synchronization monitoring,
// close the node when synchronization is complete if user required.
if ctx.Bool(utils.ExitWhenSyncedFlag.Name) {
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/misccmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"

"github.com/ethereum/go-ethereum/internal/version"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -73,6 +74,7 @@ func printVersion(ctx *cli.Context) error {

fmt.Println(strings.Title(clientIdentifier))
fmt.Println("Version:", version.WithMeta)
fmt.Println("Bsp Version:", params.BspVersion)
if git.Commit != "" {
fmt.Println("Git Commit:", git.Commit)
}
Expand Down
82 changes: 81 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"math/big"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
godebug "runtime/debug"
Expand Down Expand Up @@ -723,6 +724,23 @@ var (
Value: node.DefaultConfig.BatchResponseMaxSize,
Category: flags.APICategory,
}
BlockReplicationTargetsFlag = &cli.StringFlag{
Name: "replication.targets",
Usage: "Comma separated URLs for message-queue delivery of block specimens",
Value: "",
}
ReplicaEnableSpecimenFlag = &cli.BoolFlag{
Name: "replica.specimen",
Usage: "Enables export of fields that comprise a block-specimen",
}
ReplicaEnableResultFlag = &cli.BoolFlag{
Name: "replica.result",
Usage: "Enables export of fields that comprise a block-result",
}
ReplicaEnableBlobFlag = &cli.BoolFlag{
Name: "replica.blob",
Usage: "Enables export of fields that comprise a block-blob",
}

// Network Settings
MaxPeersFlag = &cli.IntFlag{
Expand Down Expand Up @@ -1584,7 +1602,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
setMiner(ctx, &cfg.Miner)
setRequiredBlocks(ctx, cfg)
setLes(ctx, cfg)

if ctx.IsSet(BlockReplicationTargetsFlag.Name) {
setBlockReplicationTargets(ctx, cfg)
}
// Cap the cache allowance and tune the garbage collector
mem, err := gopsutil.VirtualMemory()
if err == nil {
Expand Down Expand Up @@ -2245,3 +2265,63 @@ func MakeTrieDatabase(ctx *cli.Context, disk ethdb.Database, preimage bool, read
}
return triedb.NewDatabase(disk, config)
}

// setBlockResultTargets creates a list of replication targets from the command line flags.
func setBlockReplicationTargets(ctx *cli.Context, cfg *eth.Config) {
var urls []string

if ctx.IsSet(BlockReplicationTargetsFlag.Name) {
urls = strings.Split(ctx.String(BlockReplicationTargetsFlag.Name), ",")
}

cfg.BlockReplicationTargets = make([]string, 0, len(urls))
for _, urlStr := range urls {
if urlStr != "" {
_, err := url.Parse(urlStr)
if err != nil {
log.Crit("Replication-target URL invalid", "url", urlStr, "err", err)
os.Exit(1)
}
cfg.BlockReplicationTargets = append(cfg.BlockReplicationTargets, urlStr)
}
}
if ctx.IsSet(ReplicaEnableResultFlag.Name) || ctx.IsSet(ReplicaEnableSpecimenFlag.Name) {
if ctx.Bool(ReplicaEnableSpecimenFlag.Name) {
cfg.ReplicaEnableSpecimen = true
}
if ctx.Bool(ReplicaEnableResultFlag.Name) {
cfg.ReplicaEnableResult = true
}
if ctx.Bool(ReplicaEnableBlobFlag.Name) {
cfg.ReplicaEnableBlob = true
}
} else {
Fatalf("--replication.targets flag is invalid without --replica.specimen and/or --replica.result, ONLY ADD --replica.blob with both replica.specimen AND replica.result flags for complete unified state capture)")
}
}

func CreateReplicators(config *eth.Config) []*core.ChainReplicator {
replicators := make([]*core.ChainReplicator, 0)

for _, blockReplicationTargets := range config.BlockReplicationTargets {
blockRepl, err := eth.CreateReplicator(blockReplicationTargets)
if err != nil {
Fatalf("Can't create replication target: %v", err)
}
replicators = append(replicators, blockRepl)
}

return replicators
}

func AttachReplicators(replicators []*core.ChainReplicator, chain *core.BlockChain) {
for _, replicator := range replicators {
replicator.Start(chain, chain.ReplicaConfig)
}
}

func DrainReplicators(replicators []*core.ChainReplicator) {
for _, replicator := range replicators {
replicator.Stop()
}
}
Loading

0 comments on commit f536e11

Please sign in to comment.