Skip to content

Commit

Permalink
eth/filters: implement log filter using new log index
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Feb 4, 2025
1 parent 02ba69c commit 2c015a0
Show file tree
Hide file tree
Showing 14 changed files with 612 additions and 196 deletions.
3 changes: 3 additions & 0 deletions cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ if one is set. Otherwise it prints the genesis from the datadir.`,
utils.VMTraceFlag,
utils.VMTraceJsonConfigFlag,
utils.TransactionHistoryFlag,
utils.LogHistoryFlag,
utils.LogNoHistoryFlag,
utils.LogExportCheckpointsFlag,
utils.StateHistoryFlag,
}, utils.DatabaseFlags),
Description: `
Expand Down
3 changes: 3 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ var (
utils.SnapshotFlag,
utils.TxLookupLimitFlag, // deprecated
utils.TransactionHistoryFlag,
utils.LogHistoryFlag,
utils.LogNoHistoryFlag,
utils.LogExportCheckpointsFlag,
utils.StateHistoryFlag,
utils.LightServeFlag, // deprecated
utils.LightIngressFlag, // deprecated
Expand Down
26 changes: 26 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,23 @@ var (
Value: ethconfig.Defaults.TransactionHistory,
Category: flags.StateCategory,
}
LogHistoryFlag = &cli.Uint64Flag{
Name: "history.logs",
Usage: "Number of recent blocks to maintain log search index for (default = about one year, 0 = entire chain)",
Value: ethconfig.Defaults.LogHistory,
Category: flags.StateCategory,
}
LogNoHistoryFlag = &cli.BoolFlag{
Name: "history.logs.disable",
Usage: "Do not maintain log search index",
Category: flags.StateCategory,
}
LogExportCheckpointsFlag = &cli.StringFlag{
Name: "history.logs.export",
Usage: "Export checkpoints to file in go source file format",
Category: flags.StateCategory,
Value: "",
}
// Beacon client light sync settings
BeaconApiFlag = &cli.StringSliceFlag{
Name: "beacon.api",
Expand Down Expand Up @@ -1662,6 +1679,15 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.StateScheme = rawdb.HashScheme
log.Warn("Forcing hash state-scheme for archive mode")
}
if ctx.IsSet(LogHistoryFlag.Name) {
cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name)
}
if ctx.IsSet(LogNoHistoryFlag.Name) {
cfg.LogNoHistory = true
}
if ctx.IsSet(LogExportCheckpointsFlag.Name) {
cfg.LogExportCheckpoints = ctx.String(LogExportCheckpointsFlag.Name)
}
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
cfg.TrieCleanCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
}
Expand Down
29 changes: 19 additions & 10 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,15 @@ type BlockChain struct {
statedb *state.CachingDB // State database to reuse between imports (contains state cache)
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled

hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
blockProcCounter int32
scope event.SubscriptionScope
genesisBlock *types.Block

// This mutex synchronizes chain write operations.
// Readers don't need to take it, they can just read the database.
Expand Down Expand Up @@ -1564,8 +1565,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if len(chain) == 0 {
return 0, nil
}
bc.blockProcFeed.Send(true)
defer bc.blockProcFeed.Send(false)

// Do a sanity check that the provided chain is actually ordered and linked.
for i := 1; i < len(chain); i++ {
Expand Down Expand Up @@ -1605,6 +1604,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
if bc.insertStopped() {
return nil, 0, nil
}

if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 {
bc.blockProcFeed.Send(true)
}
defer func() {
if atomic.AddInt32(&bc.blockProcCounter, -1) == 0 {
bc.blockProcFeed.Send(false)
}
}()

// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
SenderCacher().RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)

Expand Down
5 changes: 5 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
Expand Down Expand Up @@ -404,6 +405,10 @@ func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.Ma
}
}

func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
return b.eth.filterMaps.NewMatcherBackend()
}

func (b *EthAPIBackend) Engine() consensus.Engine {
return b.eth.engine
}
Expand Down
115 changes: 115 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"math/big"
"runtime"
"sync"
"time"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/txpool"
Expand Down Expand Up @@ -85,6 +87,9 @@ type Ethereum struct {
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
closeBloomHandler chan struct{}

filterMaps *filtermaps.FilterMaps
closeFilterMaps chan chan struct{}

APIBackend *EthAPIBackend

miner *miner.Miner
Expand Down Expand Up @@ -222,6 +227,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
eth.bloomIndexer.Start(eth.blockchain)
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.newChainView(eth.blockchain.CurrentBlock()), filtermaps.DefaultParams, config.LogHistory, 1000, config.LogNoHistory, config.LogExportCheckpoints)
eth.closeFilterMaps = make(chan chan struct{})

if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
Expand Down Expand Up @@ -364,9 +371,113 @@ func (s *Ethereum) Start() error {

// Start the networking layer
s.handler.Start(s.p2pServer.MaxPeers)

// start log indexer
s.filterMaps.Start()
go s.updateFilterMapsHeads()
return nil
}

func (s *Ethereum) newChainView(head *types.Header) *filtermaps.StoredChainView {
if head == nil {
return nil
}
return filtermaps.NewStoredChainView(s.blockchain, head.Number.Uint64(), head.Hash())
}

func (s *Ethereum) updateFilterMapsHeads() {
headEventCh := make(chan core.ChainEvent, 10)
blockProcCh := make(chan bool, 10)
sub := s.blockchain.SubscribeChainEvent(headEventCh)
sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh)
defer func() {
sub.Unsubscribe()
sub2.Unsubscribe()
for {
select {
case <-headEventCh:
case <-blockProcCh:
default:
return
}
}
}()

head := s.blockchain.CurrentBlock()
targetView := s.newChainView(head) // nil if already sent to channel
var (
blockProc, lastBlockProc bool
finalBlock, lastFinal uint64
)

setHead := func(newHead *types.Header) {
if newHead == nil {
return
}
if head == nil || newHead.Hash() != head.Hash() {
head = newHead
targetView = s.newChainView(head)
}
if fb := s.blockchain.CurrentFinalBlock(); fb != nil {
finalBlock = fb.Number.Uint64()
}
}

for {
if blockProc != lastBlockProc {
select {
case s.filterMaps.BlockProcessingCh <- blockProc:
lastBlockProc = blockProc
case ev := <-headEventCh:
setHead(ev.Header)
case blockProc = <-blockProcCh:
case <-time.After(time.Second * 10):
setHead(s.blockchain.CurrentBlock())
case ch := <-s.closeFilterMaps:
close(ch)
return
}
} else if targetView != nil {
select {
case s.filterMaps.TargetViewCh <- targetView:
targetView = nil
case ev := <-headEventCh:
setHead(ev.Header)
case blockProc = <-blockProcCh:
case <-time.After(time.Second * 10):
setHead(s.blockchain.CurrentBlock())
case ch := <-s.closeFilterMaps:
close(ch)
return
}
} else if finalBlock != lastFinal {
select {
case s.filterMaps.FinalBlockCh <- finalBlock:
lastFinal = finalBlock
case ev := <-headEventCh:
setHead(ev.Header)
case blockProc = <-blockProcCh:
case <-time.After(time.Second * 10):
setHead(s.blockchain.CurrentBlock())
case ch := <-s.closeFilterMaps:
close(ch)
return
}
} else {
select {
case ev := <-headEventCh:
setHead(ev.Header)
case <-time.After(time.Second * 10):
setHead(s.blockchain.CurrentBlock())
case blockProc = <-blockProcCh:
case ch := <-s.closeFilterMaps:
close(ch)
return
}
}
}
}

func (s *Ethereum) setupDiscovery() error {
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())

Expand Down Expand Up @@ -409,6 +520,10 @@ func (s *Ethereum) Stop() error {
// Then stop everything else.
s.bloomIndexer.Close()
close(s.closeBloomHandler)
ch := make(chan struct{})
s.closeFilterMaps <- ch
<-ch
s.filterMaps.Stop()
s.txPool.Close()
s.blockchain.Stop()
s.engine.Close()
Expand Down
8 changes: 6 additions & 2 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var Defaults = Config{
NetworkId: 0, // enable auto configuration of networkID == chainID
TxLookupLimit: 2350000,
TransactionHistory: 2350000,
LogHistory: 2350000,
StateHistory: params.FullImmutabilityThreshold,
DatabaseCache: 512,
TrieCleanCache: 154,
Expand Down Expand Up @@ -93,8 +94,11 @@ type Config struct {
// Deprecated: use 'TransactionHistory' instead.
TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.

TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
LogHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head where a log search index is maintained.
LogNoHistory bool `toml:",omitempty"` // No log search index is maintained.
LogExportCheckpoints string // export log index checkpoints to file
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.

// State scheme represents the scheme used to store ethereum states and trie
// nodes on top. It can be 'hash', 'path', or none which means use the scheme
Expand Down
Loading

0 comments on commit 2c015a0

Please sign in to comment.