Skip to content

Commit

Permalink
sender lock to help prevent incorrect nonce checks (#1643) (#1671)
Browse files Browse the repository at this point in the history
* sender lock to help prevent incorrect nonce checks (#1643)

* sender lock to help prevent incorrect nonce checks

helps to ensure the pool has finished working with a sent transaction before allowing any nonce checks to be performed.  Only applies to pending nonce, latest won't use this lock.

* tidy up on waiting for sender lock
  • Loading branch information
hexoscott committed Jan 24, 2025
1 parent 1408b00 commit 7355313
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 0 deletions.
21 changes: 21 additions & 0 deletions turbo/jsonrpc/eth_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"time"

"github.com/ledgerwatch/erigon-lib/common/hexutil"

Expand All @@ -13,6 +14,7 @@ import (
"google.golang.org/grpc"

"github.com/ledgerwatch/erigon/turbo/rpchelper"
"github.com/ledgerwatch/log/v3"

txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"

Expand Down Expand Up @@ -63,6 +65,8 @@ func (api *APIImpl) GetTransactionCount(ctx context.Context, address libcommon.A
}

if blockNrOrHash.BlockNumber != nil && *blockNrOrHash.BlockNumber == rpc.PendingBlockNumber {
api.waitForSenderLockToRelease(address)

reply, err := api.txPool.Nonce(ctx, &txpool_proto.NonceRequest{
Address: gointerfaces.ConvertAddressToH160(address),
}, &grpc.EmptyCallOption{})
Expand Down Expand Up @@ -178,3 +182,20 @@ func (api *APIImpl) Exist(ctx context.Context, address libcommon.Address, blockN

return true, nil
}

func (api *APIImpl) waitForSenderLockToRelease(address libcommon.Address) {
waits := 0
for {
lock := api.SenderLocks.GetLock(address)
if lock > 0 {
time.Sleep(2 * time.Millisecond)
waits++
if waits > 250 {
log.Debug("waiting too long for transaction processing, returning default behaviour for nonce", "address", address.String())
break
}
continue
}
break
}
}
2 changes: 2 additions & 0 deletions turbo/jsonrpc/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ type APIImpl struct {
VirtualCountersSmtReduction float64
RejectLowGasPriceTransactions bool
BadTxAllowance uint64
SenderLocks *SenderLock
}

// NewEthAPI returns APIImpl instance
Expand Down Expand Up @@ -411,6 +412,7 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpoo
VirtualCountersSmtReduction: ethCfg.VirtualCountersSmtReduction,
RejectLowGasPriceTransactions: ethCfg.RejectLowGasPriceTransactions,
BadTxAllowance: ethCfg.BadTxAllowance,
SenderLocks: NewSenderLock(),
}
}

Expand Down
20 changes: 20 additions & 0 deletions turbo/jsonrpc/send_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/rpchelper"
"github.com/ledgerwatch/erigon/zk/hermez_db"
"github.com/ledgerwatch/erigon/zk/utils"
)
Expand Down Expand Up @@ -48,6 +49,25 @@ func (api *APIImpl) SendRawTransaction(ctx context.Context, encodedTx hexutility
return common.Hash{}, err
}

latestBlockNumber, err := rpchelper.GetLatestFinishedBlockNumber(tx)
if err != nil {
return common.Hash{}, err
}

header, err := api.blockByNumber(ctx, rpc.BlockNumber(latestBlockNumber), tx)
if err != nil {
return common.Hash{}, err
}

// now get the sender and put a lock in place for them
signer := types.MakeSigner(cc, latestBlockNumber, header.Time())
sender, err := txn.Sender(*signer)
if err != nil {
return common.Hash{}, err
}
api.SenderLocks.AddLock(sender)
defer api.SenderLocks.ReleaseLock(sender)

if txn.Type() != types.LegacyTxType {
latestBlock, err := api.blockByNumber(ctx, rpc.LatestBlockNumber, tx)

Expand Down
47 changes: 47 additions & 0 deletions turbo/jsonrpc/zkevm_sender_locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package jsonrpc

import (
"sync"

"github.com/ledgerwatch/erigon-lib/common"
)

// SenderLock is a map of sender addresses to the number of locks they have
// This is used to ensure that any calls for an account nonce will wait until all
// pending transactions for that account have been processed. Without this you can
// get strange race behaviour where a nonce will come back too low if the pool is taking
// a long time to process a transaction.
type SenderLock struct {
mtx sync.RWMutex
locks map[common.Address]uint64
}

func NewSenderLock() *SenderLock {
return &SenderLock{
locks: make(map[common.Address]uint64),
}
}

func (sl *SenderLock) GetLock(sender common.Address) uint64 {
sl.mtx.Lock()
defer sl.mtx.Unlock()
return sl.locks[sender]
}

func (sl *SenderLock) ReleaseLock(sender common.Address) {
sl.mtx.Lock()
defer sl.mtx.Unlock()
if current, ok := sl.locks[sender]; ok {
if current <= 1 {
delete(sl.locks, sender)
} else {
sl.locks[sender] = current - 1
}
}
}

func (sl *SenderLock) AddLock(sender common.Address) {
sl.mtx.Lock()
defer sl.mtx.Unlock()
sl.locks[sender]++
}
140 changes: 140 additions & 0 deletions turbo/jsonrpc/zkevm_sender_locks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package jsonrpc

import (
"sync"
"testing"

"github.com/ledgerwatch/erigon-lib/common"
)

func Test_SenderLocks(t *testing.T) {
sl := NewSenderLock()
addr := common.HexToAddress("0x1")

// ensure 0 to start
lock := sl.GetLock(addr)
if lock != 0 {
t.Fatalf("expected lock to be 0, got %d", lock)
}

// add a lock and check it shows
sl.AddLock(addr)
lock = sl.GetLock(addr)
if lock != 1 {
t.Fatalf("expected lock to be 1, got %d", lock)
}

// add another lock and check it shows
sl.AddLock(addr)
lock = sl.GetLock(addr)
if lock != 2 {
t.Fatalf("expected lock to be 2, got %d", lock)
}

// now release one and check it shows
sl.ReleaseLock(addr)
lock = sl.GetLock(addr)
if lock != 1 {
t.Fatalf("expected lock to be 1, got %d", lock)
}

// now release the last one and check it shows
sl.ReleaseLock(addr)
lock = sl.GetLock(addr)
if lock != 0 {
t.Fatalf("expected lock to be 0, got %d", lock)
}
if len(sl.locks) != 0 {
t.Fatalf("expected lock to be 0, got %d", len(sl.locks))
}
}

func Test_SenderLocks_Concurrency(t *testing.T) {
sl := NewSenderLock()
addr := common.HexToAddress("0x1")

wg := sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
sl.AddLock(addr)
}()
}
wg.Wait()

lock := sl.GetLock(addr)
if lock != 1000 {
t.Fatalf("expected lock to be 1000, got %d", lock)
}

// now release all the locks concurrently
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
sl.ReleaseLock(addr)
}()
}
wg.Wait()

lock = sl.GetLock(addr)
if lock != 0 {
t.Fatalf("expected lock to be 0, got %d", lock)
}
if len(sl.locks) != 0 {
t.Fatalf("expected lock to be 0, got %d", len(sl.locks))
}
}

func Test_SenderLocks_MultipleAccounts(t *testing.T) {
sl := NewSenderLock()
addr1 := common.HexToAddress("0x1")
addr2 := common.HexToAddress("0x2")

sl.AddLock(addr1)
sl.AddLock(addr2)

lock1 := sl.GetLock(addr1)
lock2 := sl.GetLock(addr2)
if lock1 != 1 {
t.Fatalf("expected lock to be 1, got %d", lock1)
}
if lock2 != 1 {
t.Fatalf("expected lock to be 1, got %d", lock2)
}

sl.ReleaseLock(addr1)

lock1 = sl.GetLock(addr1)
lock2 = sl.GetLock(addr2)
if lock1 != 0 {
t.Fatalf("expected lock to be 1, got %d", lock1)
}
if lock2 != 1 {
t.Fatalf("expected lock to be 1, got %d", lock2)
}

sl.ReleaseLock(addr2)

lock1 = sl.GetLock(addr1)
lock2 = sl.GetLock(addr2)
if lock1 != 0 {
t.Fatalf("expected lock to be 1, got %d", lock1)
}
if lock2 != 0 {
t.Fatalf("expected lock to be 1, got %d", lock2)
}
}

func Test_SenderLocks_ReleaseWhenEmpty(t *testing.T) {
sl := NewSenderLock()
addr := common.HexToAddress("0x1")

sl.ReleaseLock(addr)

lock := sl.GetLock(addr)
if lock != 0 {
t.Fatalf("expected lock to be 0, got %d", lock)
}
}

0 comments on commit 7355313

Please sign in to comment.