Skip to content

Commit

Permalink
Txpool optimization: broadcast transaction body directly to peer (#243)
Browse files Browse the repository at this point in the history
Co-authored-by: Owen <[email protected]>
Co-authored-by: andyzhang2023 <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2025
1 parent f26348b commit 9c779d5
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 5 deletions.
4 changes: 3 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/txpool/bundlepool"
"math/big"
"runtime"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/txpool/bundlepool"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -324,6 +325,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
DirectNodes: stack.Config().P2P.DirectNodes,
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Expand Down
39 changes: 37 additions & 2 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/triedb/pathdb"
)

Expand Down Expand Up @@ -88,6 +89,7 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
DirectNodes []*enode.Node
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Expand Down Expand Up @@ -137,6 +139,8 @@ type handler struct {

handlerStartCh chan struct{}
handlerDoneCh chan struct{}

directNodes map[string]struct{}
}

// newHandler returns a handler for all Ethereum chain management protocol.
Expand All @@ -159,7 +163,12 @@ func newHandler(config *handlerConfig) (*handler, error) {
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
directNodes: make(map[string]struct{}),
}
for _, node := range config.DirectNodes {
h.directNodes[node.ID().String()] = struct{}{}
}

if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the snap
// block is ahead, so snap sync was enabled for this node at a certain point.
Expand Down Expand Up @@ -620,6 +629,29 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
}
}

func (h *handler) peersForBroadcasting(numDirect int, peers []*ethPeer) (direct []*ethPeer, announce []*ethPeer) {
// Split the peers into direct-peers and announce-peers
// we send the tx directly to direct-peers
// we announce the tx to announce-peers
direct = make([]*ethPeer, 0, numDirect)
announce = make([]*ethPeer, 0, len(peers)-numDirect)
for _, peer := range peers {
if _, ok := h.directNodes[peer.ID()]; ok {
direct = append(direct, peer)
} else {
announce = append(announce, peer)
}
}

// if directly-peers are not enough, move some announce-peers into directly pool
for len(direct) < numDirect && len(announce) > 0 {
// shift one peer to trusted
direct = append(direct, announce[0])
announce = announce[1:]
}
return direct, announce
}

// BroadcastTransactions will propagate a batch of transactions
// - To a square root of all peers for non-blob transactions
// - And, separately, as announcements to all peers which are not known to
Expand All @@ -642,21 +674,24 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
peers := h.peers.peersWithoutTransaction(tx.Hash())

var numDirect int
var direct, announce []*ethPeer = nil, peers
switch {
case tx.Type() == types.BlobTxType:
blobTxs++
case tx.Size() > txMaxBroadcastSize:
largeTxs++
default:
numDirect = int(math.Sqrt(float64(len(peers))))
direct, announce = h.peersForBroadcasting(numDirect, peers)
}

// Send the tx unconditionally to a subset of our peers
for _, peer := range peers[:numDirect] {
for _, peer := range direct {
txset[peer] = append(txset[peer], tx.Hash())
log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash())
}
// For the remaining peers, send announcement only
for _, peer := range peers[numDirect:] {
for _, peer := range announce {
annos[peer] = append(annos[peer], tx.Hash())
log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash())
}
Expand Down
4 changes: 2 additions & 2 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,9 +2000,9 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c

if tx.To() == nil {
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
log.Debug("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
} else {
log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
log.Debug("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
}
return tx.Hash(), nil
}
Expand Down
3 changes: 3 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type Config struct {
// maintained and re-connected on disconnects.
StaticNodes []*enode.Node

// Direct nodes are static nodes who will always reveived transactions body rather than just the hashes.
DirectNodes []*enode.Node

// Trusted nodes are used as pre-configured connections which are always
// allowed to connect, even above the peer limit.
TrustedNodes []*enode.Node
Expand Down

0 comments on commit 9c779d5

Please sign in to comment.