Skip to content

Commit

Permalink
Added p2p helper and benchmark tests for local txns in the pool
Browse files Browse the repository at this point in the history
  • Loading branch information
eastorski committed Jan 22, 2025
1 parent 0d300af commit c0a66e4
Show file tree
Hide file tree
Showing 2 changed files with 370 additions and 0 deletions.
201 changes: 201 additions & 0 deletions tests/txpool/helper/p2p_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package helper

import (
"context"
"encoding/json"
"net/http"
"strconv"
"strings"
"time"

"github.com/erigontech/erigon-lib/crypto"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cmd/utils"
"github.com/erigontech/erigon/eth/protocols/eth"
"github.com/erigontech/erigon/p2p"
"github.com/erigontech/erigon/p2p/nat"
"github.com/erigontech/erigon/p2p/sentry"
"github.com/holiman/uint256"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/direct"
"github.com/erigontech/erigon-lib/gointerfaces"
"github.com/erigontech/erigon-lib/gointerfaces/sentryproto"
)

var (
txChanSize = 5000
)

type TxMessage struct {
MessageID sentryproto.MessageId
}

type p2pClient struct {
adminRPC string
}

// connect adminRPC
// Example: http://127.0.0.1:8545/
func NewP2P(adminRPC string) *p2pClient {
return &p2pClient{
adminRPC: adminRPC,
}
}

func (p *p2pClient) Connect() (<-chan TxMessage, <-chan error, error) {
privateKey, err := crypto.GenerateKey()
if err != nil {
return nil, nil, err
}

cfg := &p2p.Config{
ListenAddr: ":30307",
AllowedPorts: []uint{30303, 30304, 30305, 30306, 30307},
ProtocolVersion: []uint{direct.ETH68, direct.ETH67},
MaxPeers: 32,
MaxPendingPeers: 1000,
NAT: nat.Any(),
NoDiscovery: true,
Name: "p2p-mock",
NodeDatabase: "dev/nodes/eth67",
PrivateKey: privateKey,
}

r, err := http.Post(p.adminRPC, "application/json", strings.NewReader(
`{"jsonrpc":"2.0","method":"admin_nodeInfo","params":[],"id":1}`,
))
if err != nil {
return nil, nil, err
}
defer r.Body.Close()

var resp struct {
Result struct {
Enode string `json:"enode"`
Protocols struct {
Eth struct {
Genesis string `json:"genesis"`
Network int `json:"network"`
Difficulty int `json:"difficulty"`
} `json:"eth"`
} `json:"protocols"`
} `json:"result"`
}

if err := json.NewDecoder(r.Body).Decode(&resp); err != nil {
return nil, nil, err
}

if cfg.StaticNodes, err = utils.ParseNodesFromURLs([]string{resp.Result.Enode}); err != nil {
return nil, nil, err
}

grpcServer := sentry.NewGrpcServer(context.TODO(), nil, func() *eth.NodeInfo { return nil }, cfg, direct.ETH68, log.New())
sentry := direct.NewSentryClientDirect(direct.ETH68, grpcServer)

_, err = sentry.SetStatus(context.TODO(), &sentryproto.StatusData{
NetworkId: uint64(resp.Result.Protocols.Eth.Network),
TotalDifficulty: gointerfaces.ConvertUint256IntToH256(uint256.MustFromDecimal(strconv.Itoa(resp.Result.Protocols.Eth.Difficulty))),
BestHash: gointerfaces.ConvertHashToH256(
[32]byte(libcommon.FromHex(resp.Result.Protocols.Eth.Genesis)),
),
ForkData: &sentryproto.Forks{
Genesis: gointerfaces.ConvertHashToH256(
[32]byte(libcommon.FromHex(resp.Result.Protocols.Eth.Genesis)),
),
},
})
if err != nil {
return nil, nil, err
}

conn, err := sentry.Messages(context.TODO(), &sentryproto.MessagesRequest{
Ids: []sentryproto.MessageId{
sentryproto.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
sentryproto.MessageId_GET_POOLED_TRANSACTIONS_66,
sentryproto.MessageId_TRANSACTIONS_66,
sentryproto.MessageId_POOLED_TRANSACTIONS_66,
sentryproto.MessageId_NEW_POOLED_TRANSACTION_HASHES_68,
},
})
if err != nil {
return nil, nil, err
}

gotTxCh := make(chan TxMessage, txChanSize)
errCh := make(chan error)

ready, err := p.notifyWhenReady()
if err != nil {
return nil, nil, err
}

go p.serve(conn, gotTxCh, errCh)
<-ready

return gotTxCh, errCh, nil
}

func (p *p2pClient) notifyWhenReady() (<-chan struct{}, error) {
ready := make(chan struct{})

r, err := http.Post(p.adminRPC, "application/json", strings.NewReader(
`{"jsonrpc":"2.0","method":"admin_peers","params":[],"id":1}`,
))
if err != nil {
return nil, err
}
defer r.Body.Close()

var resp struct {
Result []struct {
Enode string `json:"enode"`
} `json:"result"`
}

if err := json.NewDecoder(r.Body).Decode(&resp); err != nil {
return nil, err
}

numConn := len(resp.Result)

go func() {
for {
time.Sleep(100 * time.Millisecond)

r, err := http.Post(p.adminRPC, "application/json", strings.NewReader(
`{"jsonrpc":"2.0","method":"admin_peers","params":[],"id":1}`,
))
if err != nil {
continue
}
defer r.Body.Close()

if err := json.NewDecoder(r.Body).Decode(&resp); err != nil {
continue
}

if len(resp.Result) > numConn {
ready <- struct{}{}
return
}
}
}()

return ready, nil
}

func (p *p2pClient) serve(conn sentryproto.Sentry_MessagesClient, gotTxCh chan<- TxMessage, errCh chan<- error) {
for {
req, err := conn.Recv()
if err != nil {
errCh <- err
continue
}

gotTxCh <- TxMessage{
MessageID: req.Id,
}
}
}
169 changes: 169 additions & 0 deletions tests/txpool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//go:build integration

package txpool

import (
"fmt"
"math/big"
"testing"
"time"

"github.com/erigontech/erigon-lib/crypto"
"github.com/erigontech/erigon-lib/gointerfaces/sentryproto"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cmd/devnet/requests"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/tests/txpool/helper"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require"
)

var (
// addr1 = 0x71562b71999873DB5b286dF957af199Ec94617F7
pkey1, _ = crypto.HexToECDSA("26e86e45f6fc45ec6e2ecd128cec80fa1d1505e5507dcd2ae58c3130a7a97b48")
addr1 = crypto.PubkeyToAddress(pkey1.PublicKey)

// addr2 = 0x9fB29AAc15b9A4B7F17c3385939b007540f4d791
pkey2, _ = crypto.HexToECDSA("45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8")
addr2 = crypto.PubkeyToAddress(pkey2.PublicKey)

rpcAddressNode1 = "localhost:8545"
rpcAddressNode2 = "localhost:8546"
)

// Topology of the network:
// p2p_helper ---> node1 <--- RPC txns
//
// This test sends transaction to node1 RPC which means they are local for node1
// P2P helper is binded to node1 port, that's why we measure performance of local txs processing
func TestSimpleLocalTxThroughputBenchmark(t *testing.T) {
t.Skip()

txToSendCount := 15000
measureAtEvery := 1000

p2p := helper.NewP2P(fmt.Sprintf("http://%s/", rpcAddressNode1))

gotTxCh, errCh, err := p2p.Connect()
require.NoError(t, err)

start := time.Now()

// sender part
go func() {
rpcClient := requests.NewRequestGenerator(
rpcAddressNode1,
log.New(),
)

for i := 0; i < txToSendCount; i++ {
signedTx, err := types.SignTx(
&types.LegacyTx{
CommonTx: types.CommonTx{
Nonce: uint64(i),
Gas: 21000,
To: &addr2,
Value: uint256.NewInt(100),
Data: nil,
},
GasPrice: uint256.NewInt(1),
},
*types.LatestSignerForChainID(big.NewInt(1337)),
pkey1,
)
require.NoError(t, err)

_, err = rpcClient.SendTransaction(signedTx)
require.NoError(t, err)
}
}()

lastMeasureTime := time.Now()
gotTx := 0

for gotTx < txToSendCount {
select {
case msg := <-gotTxCh:
if msg.MessageID != sentryproto.MessageId_TRANSACTIONS_66 {
continue
}

gotTx += 1

if gotTx%measureAtEvery != 0 {
continue
}

fmt.Printf("Tx/s: (%d txs processed): %.2f / s \n", measureAtEvery, float64(measureAtEvery)*float64(time.Second)/float64(time.Since(lastMeasureTime)))
lastMeasureTime = time.Now()

case err := <-errCh:
require.NoError(t, err)
}
}

fmt.Printf("\nTx/s: (total %d txs processed): %.2f / s \n", txToSendCount, float64(txToSendCount)*float64(time.Second)/float64(time.Since(start)))
fmt.Println("Processed time:", time.Since(start))
}

// Topology of the network:
// p2p_helper ---> node1 <--- RPC txns
//
// This test sends transaction to node1 RPC which means they are local for node1
// P2P helper is binded to node1 port, that's why we measure performance of local txs processing
func TestSimpleLocalTxLatencyBenchmark(t *testing.T) {
t.Skip()

txToSendCount := 1000

p2p := helper.NewP2P(fmt.Sprintf("http://%s/", rpcAddressNode1))

gotTxCh, errCh, err := p2p.Connect()
require.NoError(t, err)

rpcClient := requests.NewRequestGenerator(
rpcAddressNode1,
log.New(),
)

averageLatency := time.Duration(0)

for i := 0; i < txToSendCount; i++ {
signedTx, err := types.SignTx(
&types.LegacyTx{
CommonTx: types.CommonTx{
Nonce: uint64(i),
Gas: 21000,
To: &addr2,
Value: uint256.NewInt(100),
Data: nil,
},
GasPrice: uint256.NewInt(1),
},
*types.LatestSignerForChainID(big.NewInt(1337)),
pkey1,
)
require.NoError(t, err)

start := time.Now()

_, err = rpcClient.SendTransaction(signedTx)
require.NoError(t, err)

for stop := false; !stop; {
select {
case msg := <-gotTxCh:
if msg.MessageID == sentryproto.MessageId_TRANSACTIONS_66 {
stop = true
}
case err := <-errCh:
require.NoError(t, err)
}
}

averageLatency += time.Since(start)
}

averageLatency = averageLatency / time.Duration(txToSendCount)
fmt.Println("Avg latency:", averageLatency)
}

0 comments on commit c0a66e4

Please sign in to comment.