From 7355313aed3e8d2d5eda3a114b0827095033a7f1 Mon Sep 17 00:00:00 2001 From: Scott Fairclough <70711990+hexoscott@users.noreply.github.com> Date: Thu, 23 Jan 2025 11:23:40 +0000 Subject: [PATCH] sender lock to help prevent incorrect nonce checks (#1643) (#1671) * sender lock to help prevent incorrect nonce checks (#1643) * sender lock to help prevent incorrect nonce checks helps to ensure the pool has finished working with a sent transaction before allowing any nonce checks to be performed. Only applies to pending nonce, latest won't use this lock. * tidy up on waiting for sender lock --- turbo/jsonrpc/eth_accounts.go | 21 ++++ turbo/jsonrpc/eth_api.go | 2 + turbo/jsonrpc/send_transaction.go | 20 ++++ turbo/jsonrpc/zkevm_sender_locks.go | 47 ++++++++ turbo/jsonrpc/zkevm_sender_locks_test.go | 140 +++++++++++++++++++++++ 5 files changed, 230 insertions(+) create mode 100644 turbo/jsonrpc/zkevm_sender_locks.go create mode 100644 turbo/jsonrpc/zkevm_sender_locks_test.go diff --git a/turbo/jsonrpc/eth_accounts.go b/turbo/jsonrpc/eth_accounts.go index b80b6cc1542..c678e1b3ad3 100644 --- a/turbo/jsonrpc/eth_accounts.go +++ b/turbo/jsonrpc/eth_accounts.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "time" "github.com/ledgerwatch/erigon-lib/common/hexutil" @@ -13,6 +14,7 @@ import ( "google.golang.org/grpc" "github.com/ledgerwatch/erigon/turbo/rpchelper" + "github.com/ledgerwatch/log/v3" txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool" @@ -63,6 +65,8 @@ func (api *APIImpl) GetTransactionCount(ctx context.Context, address libcommon.A } if blockNrOrHash.BlockNumber != nil && *blockNrOrHash.BlockNumber == rpc.PendingBlockNumber { + api.waitForSenderLockToRelease(address) + reply, err := api.txPool.Nonce(ctx, &txpool_proto.NonceRequest{ Address: gointerfaces.ConvertAddressToH160(address), }, &grpc.EmptyCallOption{}) @@ -178,3 +182,20 @@ func (api *APIImpl) Exist(ctx context.Context, address libcommon.Address, blockN return true, nil } + +func (api *APIImpl) waitForSenderLockToRelease(address libcommon.Address) { + waits := 0 + for { + lock := api.SenderLocks.GetLock(address) + if lock > 0 { + time.Sleep(2 * time.Millisecond) + waits++ + if waits > 250 { + log.Debug("waiting too long for transaction processing, returning default behaviour for nonce", "address", address.String()) + break + } + continue + } + break + } +} diff --git a/turbo/jsonrpc/eth_api.go b/turbo/jsonrpc/eth_api.go index 421c49d1f4c..ebfe76ee06b 100644 --- a/turbo/jsonrpc/eth_api.go +++ b/turbo/jsonrpc/eth_api.go @@ -377,6 +377,7 @@ type APIImpl struct { VirtualCountersSmtReduction float64 RejectLowGasPriceTransactions bool BadTxAllowance uint64 + SenderLocks *SenderLock } // NewEthAPI returns APIImpl instance @@ -411,6 +412,7 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpoo VirtualCountersSmtReduction: ethCfg.VirtualCountersSmtReduction, RejectLowGasPriceTransactions: ethCfg.RejectLowGasPriceTransactions, BadTxAllowance: ethCfg.BadTxAllowance, + SenderLocks: NewSenderLock(), } } diff --git a/turbo/jsonrpc/send_transaction.go b/turbo/jsonrpc/send_transaction.go index 569ace6796a..80f590268d3 100644 --- a/turbo/jsonrpc/send_transaction.go +++ b/turbo/jsonrpc/send_transaction.go @@ -13,6 +13,7 @@ import ( "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rpc" + "github.com/ledgerwatch/erigon/turbo/rpchelper" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/erigon/zk/utils" ) @@ -48,6 +49,25 @@ func (api *APIImpl) SendRawTransaction(ctx context.Context, encodedTx hexutility return common.Hash{}, err } + latestBlockNumber, err := rpchelper.GetLatestFinishedBlockNumber(tx) + if err != nil { + return common.Hash{}, err + } + + header, err := api.blockByNumber(ctx, rpc.BlockNumber(latestBlockNumber), tx) + if err != nil { + return common.Hash{}, err + } + + // now get the sender and put a lock in place for them + signer := types.MakeSigner(cc, latestBlockNumber, header.Time()) + sender, err := txn.Sender(*signer) + if err != nil { + return common.Hash{}, err + } + api.SenderLocks.AddLock(sender) + defer api.SenderLocks.ReleaseLock(sender) + if txn.Type() != types.LegacyTxType { latestBlock, err := api.blockByNumber(ctx, rpc.LatestBlockNumber, tx) diff --git a/turbo/jsonrpc/zkevm_sender_locks.go b/turbo/jsonrpc/zkevm_sender_locks.go new file mode 100644 index 00000000000..ce4f5eeb227 --- /dev/null +++ b/turbo/jsonrpc/zkevm_sender_locks.go @@ -0,0 +1,47 @@ +package jsonrpc + +import ( + "sync" + + "github.com/ledgerwatch/erigon-lib/common" +) + +// SenderLock is a map of sender addresses to the number of locks they have +// This is used to ensure that any calls for an account nonce will wait until all +// pending transactions for that account have been processed. Without this you can +// get strange race behaviour where a nonce will come back too low if the pool is taking +// a long time to process a transaction. +type SenderLock struct { + mtx sync.RWMutex + locks map[common.Address]uint64 +} + +func NewSenderLock() *SenderLock { + return &SenderLock{ + locks: make(map[common.Address]uint64), + } +} + +func (sl *SenderLock) GetLock(sender common.Address) uint64 { + sl.mtx.Lock() + defer sl.mtx.Unlock() + return sl.locks[sender] +} + +func (sl *SenderLock) ReleaseLock(sender common.Address) { + sl.mtx.Lock() + defer sl.mtx.Unlock() + if current, ok := sl.locks[sender]; ok { + if current <= 1 { + delete(sl.locks, sender) + } else { + sl.locks[sender] = current - 1 + } + } +} + +func (sl *SenderLock) AddLock(sender common.Address) { + sl.mtx.Lock() + defer sl.mtx.Unlock() + sl.locks[sender]++ +} diff --git a/turbo/jsonrpc/zkevm_sender_locks_test.go b/turbo/jsonrpc/zkevm_sender_locks_test.go new file mode 100644 index 00000000000..b4a48f2144c --- /dev/null +++ b/turbo/jsonrpc/zkevm_sender_locks_test.go @@ -0,0 +1,140 @@ +package jsonrpc + +import ( + "sync" + "testing" + + "github.com/ledgerwatch/erigon-lib/common" +) + +func Test_SenderLocks(t *testing.T) { + sl := NewSenderLock() + addr := common.HexToAddress("0x1") + + // ensure 0 to start + lock := sl.GetLock(addr) + if lock != 0 { + t.Fatalf("expected lock to be 0, got %d", lock) + } + + // add a lock and check it shows + sl.AddLock(addr) + lock = sl.GetLock(addr) + if lock != 1 { + t.Fatalf("expected lock to be 1, got %d", lock) + } + + // add another lock and check it shows + sl.AddLock(addr) + lock = sl.GetLock(addr) + if lock != 2 { + t.Fatalf("expected lock to be 2, got %d", lock) + } + + // now release one and check it shows + sl.ReleaseLock(addr) + lock = sl.GetLock(addr) + if lock != 1 { + t.Fatalf("expected lock to be 1, got %d", lock) + } + + // now release the last one and check it shows + sl.ReleaseLock(addr) + lock = sl.GetLock(addr) + if lock != 0 { + t.Fatalf("expected lock to be 0, got %d", lock) + } + if len(sl.locks) != 0 { + t.Fatalf("expected lock to be 0, got %d", len(sl.locks)) + } +} + +func Test_SenderLocks_Concurrency(t *testing.T) { + sl := NewSenderLock() + addr := common.HexToAddress("0x1") + + wg := sync.WaitGroup{} + wg.Add(1000) + for i := 0; i < 1000; i++ { + go func() { + defer wg.Done() + sl.AddLock(addr) + }() + } + wg.Wait() + + lock := sl.GetLock(addr) + if lock != 1000 { + t.Fatalf("expected lock to be 1000, got %d", lock) + } + + // now release all the locks concurrently + wg.Add(1000) + for i := 0; i < 1000; i++ { + go func() { + defer wg.Done() + sl.ReleaseLock(addr) + }() + } + wg.Wait() + + lock = sl.GetLock(addr) + if lock != 0 { + t.Fatalf("expected lock to be 0, got %d", lock) + } + if len(sl.locks) != 0 { + t.Fatalf("expected lock to be 0, got %d", len(sl.locks)) + } +} + +func Test_SenderLocks_MultipleAccounts(t *testing.T) { + sl := NewSenderLock() + addr1 := common.HexToAddress("0x1") + addr2 := common.HexToAddress("0x2") + + sl.AddLock(addr1) + sl.AddLock(addr2) + + lock1 := sl.GetLock(addr1) + lock2 := sl.GetLock(addr2) + if lock1 != 1 { + t.Fatalf("expected lock to be 1, got %d", lock1) + } + if lock2 != 1 { + t.Fatalf("expected lock to be 1, got %d", lock2) + } + + sl.ReleaseLock(addr1) + + lock1 = sl.GetLock(addr1) + lock2 = sl.GetLock(addr2) + if lock1 != 0 { + t.Fatalf("expected lock to be 1, got %d", lock1) + } + if lock2 != 1 { + t.Fatalf("expected lock to be 1, got %d", lock2) + } + + sl.ReleaseLock(addr2) + + lock1 = sl.GetLock(addr1) + lock2 = sl.GetLock(addr2) + if lock1 != 0 { + t.Fatalf("expected lock to be 1, got %d", lock1) + } + if lock2 != 0 { + t.Fatalf("expected lock to be 1, got %d", lock2) + } +} + +func Test_SenderLocks_ReleaseWhenEmpty(t *testing.T) { + sl := NewSenderLock() + addr := common.HexToAddress("0x1") + + sl.ReleaseLock(addr) + + lock := sl.GetLock(addr) + if lock != 0 { + t.Fatalf("expected lock to be 0, got %d", lock) + } +}