Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sender lock to help prevent incorrect nonce checks (#1643) (#1671) #1681

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions turbo/jsonrpc/eth_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"time"

"github.com/ledgerwatch/erigon-lib/common/hexutil"

Expand All @@ -13,6 +14,7 @@ import (
"google.golang.org/grpc"

"github.com/ledgerwatch/erigon/turbo/rpchelper"
"github.com/ledgerwatch/log/v3"

txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"

Expand Down Expand Up @@ -63,6 +65,8 @@ func (api *APIImpl) GetTransactionCount(ctx context.Context, address libcommon.A
}

if blockNrOrHash.BlockNumber != nil && *blockNrOrHash.BlockNumber == rpc.PendingBlockNumber {
api.waitForSenderLockToRelease(address)

reply, err := api.txPool.Nonce(ctx, &txpool_proto.NonceRequest{
Address: gointerfaces.ConvertAddressToH160(address),
}, &grpc.EmptyCallOption{})
Expand Down Expand Up @@ -178,3 +182,20 @@ func (api *APIImpl) Exist(ctx context.Context, address libcommon.Address, blockN

return true, nil
}

func (api *APIImpl) waitForSenderLockToRelease(address libcommon.Address) {
waits := 0
for {
lock := api.SenderLocks.GetLock(address)
if lock > 0 {
time.Sleep(2 * time.Millisecond)
waits++
if waits > 250 {
log.Debug("waiting too long for transaction processing, returning default behaviour for nonce", "address", address.String())
break
}
continue
}
break
}
}
2 changes: 2 additions & 0 deletions turbo/jsonrpc/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ type APIImpl struct {
VirtualCountersSmtReduction float64
RejectLowGasPriceTransactions bool
BadTxAllowance uint64
SenderLocks *SenderLock
}

// NewEthAPI returns APIImpl instance
Expand Down Expand Up @@ -411,6 +412,7 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpoo
VirtualCountersSmtReduction: ethCfg.VirtualCountersSmtReduction,
RejectLowGasPriceTransactions: ethCfg.RejectLowGasPriceTransactions,
BadTxAllowance: ethCfg.BadTxAllowance,
SenderLocks: NewSenderLock(),
}
}

Expand Down
20 changes: 20 additions & 0 deletions turbo/jsonrpc/send_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/rpchelper"
"github.com/ledgerwatch/erigon/zk/hermez_db"
"github.com/ledgerwatch/erigon/zk/utils"
)
Expand Down Expand Up @@ -48,6 +49,25 @@ func (api *APIImpl) SendRawTransaction(ctx context.Context, encodedTx hexutility
return common.Hash{}, err
}

latestBlockNumber, err := rpchelper.GetLatestFinishedBlockNumber(tx)
if err != nil {
return common.Hash{}, err
}

header, err := api.blockByNumber(ctx, rpc.BlockNumber(latestBlockNumber), tx)
if err != nil {
return common.Hash{}, err
}

// now get the sender and put a lock in place for them
signer := types.MakeSigner(cc, latestBlockNumber, header.Time())
sender, err := txn.Sender(*signer)
if err != nil {
return common.Hash{}, err
}
api.SenderLocks.AddLock(sender)
defer api.SenderLocks.ReleaseLock(sender)

if txn.Type() != types.LegacyTxType {
latestBlock, err := api.blockByNumber(ctx, rpc.LatestBlockNumber, tx)

Expand Down
47 changes: 47 additions & 0 deletions turbo/jsonrpc/zkevm_sender_locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package jsonrpc

import (
"sync"

"github.com/ledgerwatch/erigon-lib/common"
)

// SenderLock is a map of sender addresses to the number of locks they have
// This is used to ensure that any calls for an account nonce will wait until all
// pending transactions for that account have been processed. Without this you can
// get strange race behaviour where a nonce will come back too low if the pool is taking
// a long time to process a transaction.
type SenderLock struct {
mtx sync.RWMutex
locks map[common.Address]uint64
}

func NewSenderLock() *SenderLock {
return &SenderLock{
locks: make(map[common.Address]uint64),
}
}

func (sl *SenderLock) GetLock(sender common.Address) uint64 {
sl.mtx.Lock()
defer sl.mtx.Unlock()
return sl.locks[sender]
}

func (sl *SenderLock) ReleaseLock(sender common.Address) {
sl.mtx.Lock()
defer sl.mtx.Unlock()
if current, ok := sl.locks[sender]; ok {
if current <= 1 {
delete(sl.locks, sender)
} else {
sl.locks[sender] = current - 1
}
}
}

func (sl *SenderLock) AddLock(sender common.Address) {
sl.mtx.Lock()
defer sl.mtx.Unlock()
sl.locks[sender]++
}
140 changes: 140 additions & 0 deletions turbo/jsonrpc/zkevm_sender_locks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package jsonrpc

import (
"sync"
"testing"

"github.com/ledgerwatch/erigon-lib/common"
)

func Test_SenderLocks(t *testing.T) {
sl := NewSenderLock()
addr := common.HexToAddress("0x1")

// ensure 0 to start
lock := sl.GetLock(addr)
if lock != 0 {
t.Fatalf("expected lock to be 0, got %d", lock)
}

// add a lock and check it shows
sl.AddLock(addr)
lock = sl.GetLock(addr)
if lock != 1 {
t.Fatalf("expected lock to be 1, got %d", lock)
}

// add another lock and check it shows
sl.AddLock(addr)
lock = sl.GetLock(addr)
if lock != 2 {
t.Fatalf("expected lock to be 2, got %d", lock)
}

// now release one and check it shows
sl.ReleaseLock(addr)
lock = sl.GetLock(addr)
if lock != 1 {
t.Fatalf("expected lock to be 1, got %d", lock)
}

// now release the last one and check it shows
sl.ReleaseLock(addr)
lock = sl.GetLock(addr)
if lock != 0 {
t.Fatalf("expected lock to be 0, got %d", lock)
}
if len(sl.locks) != 0 {
t.Fatalf("expected lock to be 0, got %d", len(sl.locks))
}
}

func Test_SenderLocks_Concurrency(t *testing.T) {
sl := NewSenderLock()
addr := common.HexToAddress("0x1")

wg := sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
sl.AddLock(addr)
}()
}
wg.Wait()

lock := sl.GetLock(addr)
if lock != 1000 {
t.Fatalf("expected lock to be 1000, got %d", lock)
}

// now release all the locks concurrently
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
sl.ReleaseLock(addr)
}()
}
wg.Wait()

lock = sl.GetLock(addr)
if lock != 0 {
t.Fatalf("expected lock to be 0, got %d", lock)
}
if len(sl.locks) != 0 {
t.Fatalf("expected lock to be 0, got %d", len(sl.locks))
}
}

func Test_SenderLocks_MultipleAccounts(t *testing.T) {
sl := NewSenderLock()
addr1 := common.HexToAddress("0x1")
addr2 := common.HexToAddress("0x2")

sl.AddLock(addr1)
sl.AddLock(addr2)

lock1 := sl.GetLock(addr1)
lock2 := sl.GetLock(addr2)
if lock1 != 1 {
t.Fatalf("expected lock to be 1, got %d", lock1)
}
if lock2 != 1 {
t.Fatalf("expected lock to be 1, got %d", lock2)
}

sl.ReleaseLock(addr1)

lock1 = sl.GetLock(addr1)
lock2 = sl.GetLock(addr2)
if lock1 != 0 {
t.Fatalf("expected lock to be 1, got %d", lock1)
}
if lock2 != 1 {
t.Fatalf("expected lock to be 1, got %d", lock2)
}

sl.ReleaseLock(addr2)

lock1 = sl.GetLock(addr1)
lock2 = sl.GetLock(addr2)
if lock1 != 0 {
t.Fatalf("expected lock to be 1, got %d", lock1)
}
if lock2 != 0 {
t.Fatalf("expected lock to be 1, got %d", lock2)
}
}

func Test_SenderLocks_ReleaseWhenEmpty(t *testing.T) {
sl := NewSenderLock()
addr := common.HexToAddress("0x1")

sl.ReleaseLock(addr)

lock := sl.GetLock(addr)
if lock != 0 {
t.Fatalf("expected lock to be 0, got %d", lock)
}
}
Loading