From 0e7f58bade4d7ac02809a176d297d454f6d7f6e9 Mon Sep 17 00:00:00 2001 From: VM Date: Thu, 14 Nov 2024 22:22:40 +0800 Subject: [PATCH 1/5] fix: recover node buffer list trie nodes for graceful kill --- triedb/pathdb/database.go | 1 + triedb/pathdb/journal.go | 25 +++++-- triedb/pathdb/nodebufferlist.go | 120 +++++++++++++------------------- 3 files changed, 72 insertions(+), 74 deletions(-) diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index d2cd0b9975..6834fe3554 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -180,6 +180,7 @@ func New(diskdb ethdb.Database, config *Config) *Database { diskdb: diskdb, useBase: config.UseBase, } + fmt.Println("useBase", db.useBase) // Open the freezer for state history if the passed database contains an // ancient store. Otherwise, all the relevant functionalities are disabled. diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index 586e2800e6..a24a9c1c2b 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -255,6 +255,8 @@ func (db *Database) loadLayers() layer { if err == nil { return head } + fmt.Println("load layers error: ", err) + log.Error("print load journal error", "error", err) // journal is not matched(or missing) with the persistent state, discard // it. Display log for discarding journal, but try to avoid showing // useless information when the db is created from scratch. @@ -268,12 +270,11 @@ func (db *Database) loadLayers() layer { stateID = rawdb.ReadPersistentStateID(db.diskdb) ) + fmt.Println("use base: ", db.useBase) if (errors.Is(err, errMissJournal) || errors.Is(err, errUnmatchedJournal)) && db.fastRecovery && db.config.TrieNodeBufferType == NodeBufferList && !db.useBase { + fmt.Println("3j3erj321") start := time.Now() - if db.freezer == nil { - log.Crit("Use unopened freezer db to recover node buffer list") - } log.Info("Recover node buffer list from ancient db") nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, @@ -287,6 +288,7 @@ func (db *Database) loadLayers() layer { } } if nb == nil || err != nil { + fmt.Println("r23k9321k9") // Return single layer with persistent state. nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.NotifyKeep, nil, false, db.useBase) @@ -365,11 +367,26 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp // Calculate the internal state transitions by id difference. nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, - db.config.NotifyKeep, nil, false, db.useBase) + db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase) if err != nil { log.Error("Failed to new trie node buffer", "error", err) return nil, err } + + fmt.Println("111") + if db.config.TrieNodeBufferType == NodeBufferList && !db.useBase { + fmt.Println("222") + recoveredRoot, recoveredStateID, _ := nb.getLatestStatus() + if recoveredRoot != root && recoveredStateID != id { + log.Error("unequal state root and state id") + fmt.Println("recoveredRoot, root, recoveredStateID, id", recoveredRoot, root, recoveredStateID, id) + return nil, errors.New("Unmatched root and state id with recovered") + } + + log.Info("Finish recovering node buffer list", "latest root hash", recoveredRoot.String(), + "latest state_id", recoveredStateID) + } + base := newDiskLayer(root, id, db, nil, nb) nb.setClean(base.cleans) return base, nil diff --git a/triedb/pathdb/nodebufferlist.go b/triedb/pathdb/nodebufferlist.go index 2128892221..71cb52ddf2 100644 --- a/triedb/pathdb/nodebufferlist.go +++ b/triedb/pathdb/nodebufferlist.go @@ -111,110 +111,90 @@ func newNodeBufferList( if nodes == nil { nodes = make(map[common.Hash]map[string]*trienode.Node) } - var size uint64 - for _, subset := range nodes { - for path, n := range subset { - size += uint64(len(n.Blob) + len(path)) - } + + nf := &nodebufferlist{ + db: db, + wpBlocks: wpBlocks, + rsevMdNum: rsevMdNum, + dlInMd: dlInMd, + limit: limit, + base: newMultiDifflayer(limit, 0, common.Hash{}, nodes, 0), + persistID: rawdb.ReadPersistentStateID(db), + stopCh: make(chan struct{}), + waitStopCh: make(chan struct{}), + forceKeepCh: make(chan struct{}), + waitForceKeepCh: make(chan struct{}), + keepFunc: keepFunc, } - base := newMultiDifflayer(limit, size, common.Hash{}, nodes, layers) - var ( - nf *nodebufferlist - err error - ) if !useBase && fastRecovery { - nf, err = recoverNodeBufferList(db, freezer, base, limit, wpBlocks, rsevMdNum, dlInMd) - if err != nil { + if freezer == nil { + log.Crit("Use unopened freezer db to recover node buffer list") + } + + if err := nf.recoverNodeBufferList(freezer); err != nil { log.Error("Failed to recover node buffer list", "error", err) return nil, err } } else { ele := newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) - nf = &nodebufferlist{ - db: db, - wpBlocks: wpBlocks, - rsevMdNum: rsevMdNum, - dlInMd: dlInMd, - limit: limit, - base: base, - head: ele, - tail: ele, - count: 1, - persistID: rawdb.ReadPersistentStateID(db), - stopCh: make(chan struct{}), - waitStopCh: make(chan struct{}), - forceKeepCh: make(chan struct{}), - waitForceKeepCh: make(chan struct{}), - keepFunc: keepFunc, - } - nf.useBase.Store(useBase) + nf.head = ele + nf.tail = ele + nf.count = 1 } + nf.useBase.Store(useBase) go nf.loop() log.Info("new node buffer list", "proposed block interval", nf.wpBlocks, - "reserve multi difflayers", nf.rsevMdNum, "difflayers in multidifflayer", nf.dlInMd, - "limit", common.StorageSize(limit), "layers", layers, "persist id", nf.persistID, "base_size", size) + "reserve multi diff_layers", nf.rsevMdNum, "diff_layers in multi_diff_layer", nf.dlInMd, + "limit", common.StorageSize(limit), "layers", layers, "persist_id", nf.persistID, "base_size", nf.size) return nf, nil } // recoverNodeBufferList recovers node buffer list -func recoverNodeBufferList(db ethdb.Database, freezer *rawdb.ResettableFreezer, base *multiDifflayer, - limit, wpBlocks, rsevMdNum, dlInMd uint64) (*nodebufferlist, error) { - nbl := &nodebufferlist{ - db: db, - wpBlocks: wpBlocks, - rsevMdNum: rsevMdNum, - dlInMd: dlInMd, - limit: limit, - base: base, - persistID: rawdb.ReadPersistentStateID(db), - stopCh: make(chan struct{}), - waitStopCh: make(chan struct{}), - forceKeepCh: make(chan struct{}), - waitForceKeepCh: make(chan struct{}), - } +func (nf *nodebufferlist) recoverNodeBufferList(freezer *rawdb.ResettableFreezer) error { head, err := freezer.Ancients() if err != nil { log.Error("Failed to get freezer ancients", "error", err) - return nil, err + return err } tail, err := freezer.Tail() if err != nil { log.Error("Failed to get freezer tail", "error", err) - return nil, err + return err } - log.Info("Ancient db meta info", "persistent_state_id", nbl.persistID, "head_state_id", head, - "tail_state_id", tail, "waiting_recover_num", head-nbl.persistID) + fmt.Println() + log.Info("Ancient db meta info", "persistent_state_id", nf.persistID, "head_state_id", head, + "tail_state_id", tail, "waiting_recover_num", head-nf.persistID) - startStateID := nbl.persistID + 1 + startStateID := nf.persistID + 1 startBlock, err := readBlockNumber(freezer, startStateID) if err != nil { log.Error("Failed to read start block number", "error", err, "tail_state_id", startStateID) - return nil, err + return err } endBlock, err := readBlockNumber(freezer, head) if err != nil { log.Error("Failed to read end block number", "error", err, "head_state_id", head) - return nil, err + return err } - blockIntervals := nbl.createBlockInterval(startBlock, endBlock) - stateIntervals, err := nbl.createStateInterval(freezer, startStateID, head, blockIntervals) + blockIntervals := nf.createBlockInterval(startBlock, endBlock) + stateIntervals, err := nf.createStateInterval(freezer, startStateID, head, blockIntervals) if err != nil { - return nil, err + return err } - log.Info("block intervals info", "blockIntervals", blockIntervals, "stateIntervals", stateIntervals, - "startBlock", startBlock, "endBlock", endBlock) + log.Info("block intervals info", "block_intervals", blockIntervals, "state_intervals", stateIntervals, + "start_block", startBlock, "end_block", endBlock) var eg errgroup.Group - nbl.linkMultiDiffLayers(len(blockIntervals)) - for current, i := nbl.head, 0; current != nil; current, i = current.next, i+1 { + nf.linkMultiDiffLayers(len(blockIntervals)) + for current, i := nf.head, 0; current != nil; current, i = current.next, i+1 { index := i mdl := current eg.Go(func() error { for j := stateIntervals[index][0]; j <= stateIntervals[index][1]; j++ { - h, err := nbl.readStateHistory(freezer, j) + h, err := nf.readStateHistory(freezer, j) if err != nil { log.Error("Failed to read state history", "error", err) return err @@ -228,18 +208,18 @@ func recoverNodeBufferList(db ethdb.Database, freezer *rawdb.ResettableFreezer, }) } if err = eg.Wait(); err != nil { - return nil, err + return err } - for current, i := nbl.head, 0; current != nil; current, i = current.next, i+1 { - nbl.size += current.size - nbl.layers += current.layers + for current, i := nf.head, 0; current != nil; current, i = current.next, i+1 { + nf.size += current.size + nf.layers += current.layers } - nbl.diffToBase() + nf.diffToBase() - log.Info("Succeed to add diff layer", "base_size", nbl.base.size, "tail_state_id", nbl.tail.id, - "head_state_id", nbl.head.id, "nbl_layers", nbl.layers, "base_layers", nbl.base.layers) - return nbl, nil + log.Info("Succeed to add diff layer", "base_size", nf.base.size, "tail_state_id", nf.tail.id, + "head_state_id", nf.head.id, "nbl_layers", nf.layers, "base_layers", nf.base.layers) + return nil } // linkMultiDiffLayers links specified amount of multiDiffLayers for recovering From 7389464a410f66b88f46298807785f67a81f84d0 Mon Sep 17 00:00:00 2001 From: VM Date: Fri, 15 Nov 2024 23:37:37 +0800 Subject: [PATCH 2/5] test: fix unit tests --- core/blockchain.go | 1 + core/blockchain_snapshot_test.go | 30 ++++++++++++++++++++++-------- triedb/pathdb/database.go | 3 +-- triedb/pathdb/database_test.go | 3 ++- triedb/pathdb/journal.go | 11 +++-------- triedb/pathdb/nodebufferlist.go | 12 ++++-------- 6 files changed, 33 insertions(+), 27 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 825dce4e0d..dec63d2c40 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -195,6 +195,7 @@ func (c *CacheConfig) triedbConfig(keepFunc pathdb.NotifyKeepFunc) *triedb.Confi NotifyKeep: keepFunc, JournalFilePath: c.JournalFilePath, JournalFile: c.JournalFile, + UseBase: c.UseBase, } } return config diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index 348cc3f473..e2a5a2c9c7 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -81,7 +81,9 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo } engine = ethash.NewFullFaker() ) - chain, err := NewBlockChain(db, DefaultCacheConfigWithScheme(basic.scheme), gspec, nil, engine, vm.Config{}, nil, nil) + cacheConfig := DefaultCacheConfigWithScheme(basic.scheme) + cacheConfig.UseBase = true + chain, err := NewBlockChain(db, cacheConfig, gspec, nil, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to create chain: %v", err) } @@ -180,11 +182,11 @@ func (basic *snapshotTestBasic) dump() string { } fmt.Fprint(buffer, "\n") - //if crash { + // if crash { // fmt.Fprintf(buffer, "\nCRASH\n\n") - //} else { + // } else { // fmt.Fprintf(buffer, "\nSetHead(%d)\n\n", basic.setHead) - //} + // } fmt.Fprintf(buffer, "------------------------------\n\n") fmt.Fprint(buffer, "Expected in leveldb:\n G") @@ -228,7 +230,10 @@ func (snaptest *snapshotTest) test(t *testing.T) { // Restart the chain normally chain.Stop() - newchain, err := NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) + + cacheConfig := DefaultCacheConfigWithScheme(snaptest.scheme) + cacheConfig.UseBase = true + newchain, err := NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) } @@ -313,6 +318,7 @@ func (snaptest *gappedSnapshotTest) test(t *testing.T) { SnapshotLimit: 0, StateScheme: snaptest.scheme, } + cacheConfig.UseBase = true newchain, err := NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) @@ -321,7 +327,9 @@ func (snaptest *gappedSnapshotTest) test(t *testing.T) { newchain.Stop() // Restart the chain with enabling the snapshot - newchain, err = NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) + config := DefaultCacheConfigWithScheme(snaptest.scheme) + config.UseBase = true + newchain, err = NewBlockChain(snaptest.db, config, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) } @@ -349,7 +357,9 @@ func (snaptest *setHeadSnapshotTest) test(t *testing.T) { chain.SetHead(snaptest.setHead) chain.Stop() - newchain, err := NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) + cacheConfig := DefaultCacheConfigWithScheme(snaptest.scheme) + cacheConfig.UseBase = true + newchain, err := NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) } @@ -385,6 +395,7 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) { SnapshotLimit: 0, StateScheme: snaptest.scheme, } + config.UseBase = true newchain, err := NewBlockChain(snaptest.db, config, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) @@ -402,6 +413,7 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) { SnapshotWait: false, // Don't wait rebuild StateScheme: snaptest.scheme, } + config.UseBase = true tmp, err := NewBlockChain(snaptest.db, config, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) @@ -411,7 +423,9 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) { tmp.triedb.Close() tmp.stopWithoutSaving() - newchain, err = NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) + cacheConfig := DefaultCacheConfigWithScheme(snaptest.scheme) + cacheConfig.UseBase = true + newchain, err = NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) } diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 6834fe3554..e9d5c82308 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -180,7 +180,6 @@ func New(diskdb ethdb.Database, config *Config) *Database { diskdb: diskdb, useBase: config.UseBase, } - fmt.Println("useBase", db.useBase) // Open the freezer for state history if the passed database contains an // ancient store. Otherwise, all the relevant functionalities are disabled. @@ -369,7 +368,7 @@ func (db *Database) Enable(root common.Hash) error { // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, - db.config.NotifyKeep, nil, false, false) + db.config.NotifyKeep, db.freezer, false, false) if err != nil { log.Error("Failed to new trie node buffer", "error", err) return err diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index a51f339be6..3674064cf3 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -319,6 +319,7 @@ func (t *tester) verifyState(root common.Hash) error { } _, err = reader.Node(common.Hash{}, nil, root) if err != nil { + fmt.Println("error: ", err) return errors.New("root node is not available") } for addrHash, account := range t.snapAccounts[root] { @@ -459,6 +460,7 @@ func TestDisable(t *testing.T) { t.Fatalf("Invalid activation should be rejected") } if err := tester.db.Enable(stored); err != nil { + fmt.Println(err) t.Fatal("Failed to activate database") } @@ -516,7 +518,6 @@ func TestJournal(t *testing.T) { } tester.db.Close() pathConfig := Defaults - pathConfig.UseBase = true tester.db = New(tester.db.diskdb, pathConfig) // Verify states including disk layer and all diff on top. diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index a24a9c1c2b..4579bb41aa 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -250,13 +250,12 @@ func (db *Database) loadLayers() layer { _, root := rawdb.ReadAccountTrieNode(db.diskdb, nil) root = types.TrieRootHash(root) + fmt.Println("1 useBase, fastRecovery", db.useBase, db.fastRecovery) // Load the layers by resolving the journal head, err := db.loadJournal(root) if err == nil { return head } - fmt.Println("load layers error: ", err) - log.Error("print load journal error", "error", err) // journal is not matched(or missing) with the persistent state, discard // it. Display log for discarding journal, but try to avoid showing // useless information when the db is created from scratch. @@ -270,10 +269,9 @@ func (db *Database) loadLayers() layer { stateID = rawdb.ReadPersistentStateID(db.diskdb) ) - fmt.Println("use base: ", db.useBase) + fmt.Println("2 useBase, fastRecovery", db.useBase, db.fastRecovery) if (errors.Is(err, errMissJournal) || errors.Is(err, errUnmatchedJournal)) && db.fastRecovery && db.config.TrieNodeBufferType == NodeBufferList && !db.useBase { - fmt.Println("3j3erj321") start := time.Now() log.Info("Recover node buffer list from ancient db") @@ -288,7 +286,6 @@ func (db *Database) loadLayers() layer { } } if nb == nil || err != nil { - fmt.Println("r23k9321k9") // Return single layer with persistent state. nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.NotifyKeep, nil, false, db.useBase) @@ -365,6 +362,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp } } + fmt.Println("3 useBase, fastRecovery", db.useBase, db.fastRecovery) // Calculate the internal state transitions by id difference. nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase) @@ -373,13 +371,10 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp return nil, err } - fmt.Println("111") if db.config.TrieNodeBufferType == NodeBufferList && !db.useBase { - fmt.Println("222") recoveredRoot, recoveredStateID, _ := nb.getLatestStatus() if recoveredRoot != root && recoveredStateID != id { log.Error("unequal state root and state id") - fmt.Println("recoveredRoot, root, recoveredStateID, id", recoveredRoot, root, recoveredStateID, id) return nil, errors.New("Unmatched root and state id with recovered") } diff --git a/triedb/pathdb/nodebufferlist.go b/triedb/pathdb/nodebufferlist.go index 71cb52ddf2..d1b43d70e0 100644 --- a/triedb/pathdb/nodebufferlist.go +++ b/triedb/pathdb/nodebufferlist.go @@ -108,17 +108,13 @@ func newNodeBufferList( dlInMd = wpBlocks } - if nodes == nil { - nodes = make(map[common.Hash]map[string]*trienode.Node) - } - nf := &nodebufferlist{ db: db, wpBlocks: wpBlocks, rsevMdNum: rsevMdNum, dlInMd: dlInMd, limit: limit, - base: newMultiDifflayer(limit, 0, common.Hash{}, nodes, 0), + base: newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0), persistID: rawdb.ReadPersistentStateID(db), stopCh: make(chan struct{}), waitStopCh: make(chan struct{}), @@ -127,6 +123,7 @@ func newNodeBufferList( keepFunc: keepFunc, } + fmt.Println("useBase, fastRecovery", useBase, fastRecovery) if !useBase && fastRecovery { if freezer == nil { log.Crit("Use unopened freezer db to recover node buffer list") @@ -164,7 +161,6 @@ func (nf *nodebufferlist) recoverNodeBufferList(freezer *rawdb.ResettableFreezer log.Error("Failed to get freezer tail", "error", err) return err } - fmt.Println() log.Info("Ancient db meta info", "persistent_state_id", nf.persistID, "head_state_id", head, "tail_state_id", tail, "waiting_recover_num", head-nf.persistID) @@ -841,8 +837,8 @@ func (nf *nodebufferlist) proposedBlockReader(blockRoot common.Hash) (layer, err func (nf *nodebufferlist) report() { context := []interface{}{ "number", nf.block, "count", nf.count, "layers", nf.layers, - "stateid", nf.stateId, "persist", nf.persistID, "size", common.StorageSize(nf.size), - "basesize", common.StorageSize(nf.base.size), "baselayers", nf.base.layers, + "state_id", nf.stateId, "persist", nf.persistID, "size", common.StorageSize(nf.size), + "base_size", common.StorageSize(nf.base.size), "base_layers", nf.base.layers, } log.Info("node buffer list info", context...) } From 5a1697604b418a6bcdce7cfe8b23d666b4a43350 Mon Sep 17 00:00:00 2001 From: VM Date: Mon, 18 Nov 2024 11:16:06 +0800 Subject: [PATCH 3/5] fix: optimize logs --- core/rawdb/freezer.go | 2 +- eth/backend.go | 26 ++++++++++++++------------ triedb/pathdb/database_test.go | 2 -- triedb/pathdb/journal.go | 33 +++++++-------------------------- triedb/pathdb/nodebufferlist.go | 18 +++++++++++++++--- 5 files changed, 37 insertions(+), 44 deletions(-) diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 54b911f1cb..ed0d582380 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -127,7 +127,7 @@ func NewFreezer(datadir string, namespace string, readonly, writeTrieNode bool, // Create the tables. for name, disableSnappy := range tables { if name == stateHistoryTrieNodesData && !writeTrieNode { - log.Info("Not create trie node data") + log.Info("Not create trie node data in freezer db") continue } table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy, readonly) diff --git a/eth/backend.go b/eth/backend.go index 380498c526..3db68fe8e3 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -40,6 +40,7 @@ import ( "github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool/blobpool" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -134,6 +135,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice) } + // Assemble the Ethereum object + chainDb, err := stack.OpenAndMergeDatabase("chaindata", ChainDBNamespace, false, config.DatabaseCache, config.DatabaseHandles, + config.DatabaseFreezer) + if err != nil { + return nil, err + } + config.StateScheme, err = rawdb.ParseStateScheme(config.StateScheme, chainDb) + if err != nil { + return nil, err + } + if config.StateScheme == rawdb.HashScheme && config.NoPruning && config.TrieDirtyCache > 0 { if config.SnapshotCache > 0 { config.TrieCleanCache += config.TrieDirtyCache * 3 / 5 @@ -153,21 +165,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxBufferSize/1024/1024 config.TrieDirtyCache = pathdb.MaxBufferSize / 1024 / 1024 } - log.Info("Allocated memory caches", - "state_scheme", config.StateScheme, + log.Info("Allocated memory caches", "state_scheme", config.StateScheme, "trie_clean_cache", common.StorageSize(config.TrieCleanCache)*1024*1024, "trie_dirty_cache", common.StorageSize(config.TrieDirtyCache)*1024*1024, "snapshot_cache", common.StorageSize(config.SnapshotCache)*1024*1024) - // Assemble the Ethereum object - chainDb, err := stack.OpenAndMergeDatabase("chaindata", ChainDBNamespace, false, config.DatabaseCache, config.DatabaseHandles, - config.DatabaseFreezer) - if err != nil { - return nil, err - } - config.StateScheme, err = rawdb.ParseStateScheme(config.StateScheme, chainDb) - if err != nil { - return nil, err - } + // Try to recover offline state pruning only in hash-based. if config.StateScheme == rawdb.HashScheme { if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil { diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 3674064cf3..c823f05291 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -319,7 +319,6 @@ func (t *tester) verifyState(root common.Hash) error { } _, err = reader.Node(common.Hash{}, nil, root) if err != nil { - fmt.Println("error: ", err) return errors.New("root node is not available") } for addrHash, account := range t.snapAccounts[root] { @@ -460,7 +459,6 @@ func TestDisable(t *testing.T) { t.Fatalf("Invalid activation should be rejected") } if err := tester.db.Enable(stored); err != nil { - fmt.Println(err) t.Fatal("Failed to activate database") } diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index 4579bb41aa..fab4495d88 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -250,7 +250,6 @@ func (db *Database) loadLayers() layer { _, root := rawdb.ReadAccountTrieNode(db.diskdb, nil) root = types.TrieRootHash(root) - fmt.Println("1 useBase, fastRecovery", db.useBase, db.fastRecovery) // Load the layers by resolving the journal head, err := db.loadJournal(root) if err == nil { @@ -269,7 +268,6 @@ func (db *Database) loadLayers() layer { stateID = rawdb.ReadPersistentStateID(db.diskdb) ) - fmt.Println("2 useBase, fastRecovery", db.useBase, db.fastRecovery) if (errors.Is(err, errMissJournal) || errors.Is(err, errUnmatchedJournal)) && db.fastRecovery && db.config.TrieNodeBufferType == NodeBufferList && !db.useBase { start := time.Now() @@ -332,23 +330,13 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp if stored > id { return nil, fmt.Errorf("invalid state id: stored %d resolved %d", stored, id) } + // Resolve nodes cached in node buffer var encoded []journalNodes if err := journalBuf.Decode(&encoded); err != nil { return nil, fmt.Errorf("failed to load disk nodes: %v", err) } - nodes := make(map[common.Hash]map[string]*trienode.Node) - for _, entry := range encoded { - subset := make(map[string]*trienode.Node) - for _, n := range entry.Nodes { - if len(n.Blob) > 0 { - subset[string(n.Path)] = trienode.New(crypto.Keccak256Hash(n.Blob), n.Blob) - } else { - subset[string(n.Path)] = trienode.NewDeleted() - } - } - nodes[entry.Owner] = subset - } + nodes := flattenTrieNodes(encoded) if journalTypeForReader == JournalFileType { var shaSum [32]byte @@ -362,7 +350,6 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp } } - fmt.Println("3 useBase, fastRecovery", db.useBase, db.fastRecovery) // Calculate the internal state transitions by id difference. nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase) @@ -371,14 +358,15 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp return nil, err } - if db.config.TrieNodeBufferType == NodeBufferList && !db.useBase { + if db.config.TrieNodeBufferType == NodeBufferList && !db.useBase && db.fastRecovery { recoveredRoot, recoveredStateID, _ := nb.getLatestStatus() if recoveredRoot != root && recoveredStateID != id { - log.Error("unequal state root and state id") + log.Error("Recovered state root and state id are different from recording ones", + "recovered_root", recoveredRoot, "root", root, "recovered_state_id", recoveredStateID, "id", id) return nil, errors.New("Unmatched root and state id with recovered") } - log.Info("Finish recovering node buffer list", "latest root hash", recoveredRoot.String(), + log.Info("Disk layer finishes recovering node buffer list", "latest root hash", recoveredRoot.String(), "latest state_id", recoveredStateID) } @@ -498,14 +486,7 @@ func (dl *diskLayer) journal(w io.Writer, journalType JournalType) error { } // Step three, write all unwritten nodes into the journal bufferNodes := dl.buffer.getAllNodes() - nodes := make([]journalNodes, 0, len(bufferNodes)) - for owner, subset := range bufferNodes { - entry := journalNodes{Owner: owner} - for path, node := range subset { - entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob}) - } - nodes = append(nodes, entry) - } + nodes := compressTrieNodes(bufferNodes) if err := rlp.Encode(journalBuf, nodes); err != nil { return err } diff --git a/triedb/pathdb/nodebufferlist.go b/triedb/pathdb/nodebufferlist.go index d1b43d70e0..0fe2b1c687 100644 --- a/triedb/pathdb/nodebufferlist.go +++ b/triedb/pathdb/nodebufferlist.go @@ -108,13 +108,26 @@ func newNodeBufferList( dlInMd = wpBlocks } + var base *multiDifflayer + if nodes != nil && !fastRecovery { + var size uint64 + for _, subset := range nodes { + for path, n := range subset { + size += uint64(len(n.Blob) + len(path)) + } + } + base = newMultiDifflayer(limit, size, common.Hash{}, nodes, layers) + } else { + base = newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) + } + nf := &nodebufferlist{ db: db, wpBlocks: wpBlocks, rsevMdNum: rsevMdNum, dlInMd: dlInMd, limit: limit, - base: newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0), + base: base, persistID: rawdb.ReadPersistentStateID(db), stopCh: make(chan struct{}), waitStopCh: make(chan struct{}), @@ -123,7 +136,6 @@ func newNodeBufferList( keepFunc: keepFunc, } - fmt.Println("useBase, fastRecovery", useBase, fastRecovery) if !useBase && fastRecovery { if freezer == nil { log.Crit("Use unopened freezer db to recover node buffer list") @@ -213,7 +225,7 @@ func (nf *nodebufferlist) recoverNodeBufferList(freezer *rawdb.ResettableFreezer } nf.diffToBase() - log.Info("Succeed to add diff layer", "base_size", nf.base.size, "tail_state_id", nf.tail.id, + log.Info("Succeed to recover node buffer list", "base_size", nf.base.size, "tail_state_id", nf.tail.id, "head_state_id", nf.head.id, "nbl_layers", nf.layers, "base_layers", nf.base.layers) return nil } From 1f6aea3ee96010b313b92cba1d0d33e2de836760 Mon Sep 17 00:00:00 2001 From: VM Date: Mon, 23 Dec 2024 16:28:28 +0800 Subject: [PATCH 4/5] fix: flush base buffer after recovering node buffer list --- triedb/pathdb/nodebufferlist.go | 1 + 1 file changed, 1 insertion(+) diff --git a/triedb/pathdb/nodebufferlist.go b/triedb/pathdb/nodebufferlist.go index 0fe2b1c687..67ca59cbc7 100644 --- a/triedb/pathdb/nodebufferlist.go +++ b/triedb/pathdb/nodebufferlist.go @@ -224,6 +224,7 @@ func (nf *nodebufferlist) recoverNodeBufferList(freezer *rawdb.ResettableFreezer nf.layers += current.layers } nf.diffToBase() + nf.backgroundFlush() log.Info("Succeed to recover node buffer list", "base_size", nf.base.size, "tail_state_id", nf.tail.id, "head_state_id", nf.head.id, "nbl_layers", nf.layers, "base_layers", nf.base.layers) From 3860a1be3138428036f16a90dd46c0b00a66d7fc Mon Sep 17 00:00:00 2001 From: VM Date: Fri, 10 Jan 2025 11:58:39 +0800 Subject: [PATCH 5/5] fix: skip node buffer count check when recovering --- eth/backend.go | 1 - triedb/pathdb/database_test.go | 1 + triedb/pathdb/nodebufferlist.go | 24 +++++++++++++++++------- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 3db68fe8e3..28fc33d51d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -40,7 +40,6 @@ import ( "github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool/blobpool" - "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index c823f05291..a51f339be6 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -516,6 +516,7 @@ func TestJournal(t *testing.T) { } tester.db.Close() pathConfig := Defaults + pathConfig.UseBase = true tester.db = New(tester.db.diskdb, pathConfig) // Verify states including disk layer and all diff on top. diff --git a/triedb/pathdb/nodebufferlist.go b/triedb/pathdb/nodebufferlist.go index 67ca59cbc7..a05e90868c 100644 --- a/triedb/pathdb/nodebufferlist.go +++ b/triedb/pathdb/nodebufferlist.go @@ -109,7 +109,9 @@ func newNodeBufferList( } var base *multiDifflayer - if nodes != nil && !fastRecovery { + if nodes != nil && useBase { + // after using fast recovery, use ancient db to recover nbl for force kill and graceful kill. + // so this case for now is used in unit test var size uint64 for _, subset := range nodes { for path, n := range subset { @@ -135,6 +137,7 @@ func newNodeBufferList( waitForceKeepCh: make(chan struct{}), keepFunc: keepFunc, } + nf.useBase.Store(useBase) if !useBase && fastRecovery { if freezer == nil { @@ -151,7 +154,6 @@ func newNodeBufferList( nf.tail = ele nf.count = 1 } - nf.useBase.Store(useBase) go nf.loop() @@ -223,7 +225,7 @@ func (nf *nodebufferlist) recoverNodeBufferList(freezer *rawdb.ResettableFreezer nf.size += current.size nf.layers += current.layers } - nf.diffToBase() + nf.diffToBase(true) nf.backgroundFlush() log.Info("Succeed to recover node buffer list", "base_size", nf.base.size, "tail_state_id", nf.tail.id, @@ -676,16 +678,23 @@ func (nf *nodebufferlist) traverseReverse(cb func(*multiDifflayer) bool) { // diffToBase calls traverseReverse and merges the multiDifflayer's nodes to // base node buffer, if up to limit size and flush to disk. It is called // periodically in the background -func (nf *nodebufferlist) diffToBase() { +func (nf *nodebufferlist) diffToBase(skipCountCheck bool) { + count := 0 commitFunc := func(buffer *multiDifflayer) bool { if nf.base.size >= nf.base.limit { log.Debug("base node buffer need write disk immediately") return false } - if nf.count <= nf.rsevMdNum { - log.Debug("node buffer list less, waiting more difflayer to be committed") + if skipCountCheck && count == 1 { // only force flush one buffer to base return false } + if !skipCountCheck { + // when using fast recovery, force flush one buffer to base to avoid exceeding pebble batch size limit + if nf.count <= nf.rsevMdNum { + log.Debug("node buffer list less, waiting more difflayer to be committed") + return false + } + } if buffer.block%nf.dlInMd != 0 { log.Crit("committed block number misaligned", "block", buffer.block) } @@ -724,6 +733,7 @@ func (nf *nodebufferlist) diffToBase() { baseNodeBufferDifflayerAvgSize.Update(int64(nf.base.size / nf.base.layers)) } nf.report() + count++ return true } @@ -804,7 +814,7 @@ func (nf *nodebufferlist) loop() { if nf.isFlushing.Swap(true) { continue } - nf.diffToBase() + nf.diffToBase(false) if nf.base.size >= nf.base.limit { nf.backgroundFlush() }