diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 0684d5348c..e4e1b79353 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -689,18 +689,6 @@ func mainImpl() int { } } - execNodeConfig := execNode.ConfigFetcher() - if execNodeConfig.Sequencer.Enable && execNodeConfig.Sequencer.Timeboost.Enable { - execNode.Sequencer.StartExpressLane( - ctx, - execNode.Backend.APIBackend(), - execNode.FilterSystem, - common.HexToAddress(execNodeConfig.Sequencer.Timeboost.AuctionContractAddress), - common.HexToAddress(execNodeConfig.Sequencer.Timeboost.AuctioneerAddress), - execNodeConfig.Sequencer.Timeboost.EarlySubmissionGrace, - ) - } - err = nil select { case err = <-fatalErrChan: diff --git a/execution/gethexec/contract_adapter.go b/execution/gethexec/contract_adapter.go new file mode 100644 index 0000000000..446e3a5cae --- /dev/null +++ b/execution/gethexec/contract_adapter.go @@ -0,0 +1,85 @@ +// Copyright 2024-2025, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package gethexec + +import ( + "context" + "errors" + "math" + "math/big" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/arbitrum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/ethereum/go-ethereum/rpc" +) + +// contractAdapter is an impl of bind.ContractBackend with necessary methods defined to work with the ExpressLaneAuction contract +type contractAdapter struct { + *filters.FilterAPI + bind.ContractTransactor // We leave this member unset as it is not used. + + apiBackend *arbitrum.APIBackend +} + +func (a *contractAdapter) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + logPointers, err := a.GetLogs(ctx, filters.FilterCriteria(q)) + if err != nil { + return nil, err + } + logs := make([]types.Log, 0, len(logPointers)) + for _, log := range logPointers { + logs = append(logs, *log) + } + return logs, nil +} + +func (a *contractAdapter) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + return nil, errors.New("contractAdapter doesn't implement SubscribeFilterLogs - shouldn't be needed") +} + +func (a *contractAdapter) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { + return nil, errors.New("contractAdapter doesn't implement CodeAt - shouldn't be needed") +} + +func (a *contractAdapter) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { + var num rpc.BlockNumber = rpc.LatestBlockNumber + if blockNumber != nil { + num = rpc.BlockNumber(blockNumber.Int64()) + } + + state, header, err := a.apiBackend.StateAndHeaderByNumber(ctx, num) + if err != nil { + return nil, err + } + + msg := &core.Message{ + From: call.From, + To: call.To, + Value: big.NewInt(0), + GasLimit: math.MaxUint64, + GasPrice: big.NewInt(0), + GasFeeCap: big.NewInt(0), + GasTipCap: big.NewInt(0), + Data: call.Data, + AccessList: call.AccessList, + SkipAccountChecks: true, + TxRunMode: core.MessageEthcallMode, // Indicate this is an eth_call + SkipL1Charging: true, // Skip L1 data fees + } + + evm := a.apiBackend.GetEVM(ctx, msg, state, header, &vm.Config{NoBaseFee: true}, nil) + gp := new(core.GasPool).AddGas(math.MaxUint64) + result, err := core.ApplyMessage(evm, msg, gp) + if err != nil { + return nil, err + } + + return result.ReturnData, nil +} diff --git a/execution/gethexec/express_lane_service.go b/execution/gethexec/express_lane_service.go index c47d606c9e..73495f9d12 100644 --- a/execution/gethexec/express_lane_service.go +++ b/execution/gethexec/express_lane_service.go @@ -6,23 +6,17 @@ package gethexec import ( "context" "fmt" - "math" - "math/big" "sync" "time" "github.com/pkg/errors" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/arbitrum" "github.com/ethereum/go-ethereum/arbitrum_types" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" @@ -30,14 +24,10 @@ import ( "github.com/offchainlabs/nitro/solgen/go/express_lane_auctiongen" "github.com/offchainlabs/nitro/timeboost" + "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/stopwaiter" ) -type expressLaneControl struct { - sequence uint64 - controller common.Address -} - type transactionPublisher interface { PublishTimeboostedTransaction(context.Context, *types.Transaction, *arbitrum_types.ConditionalOptions, chan struct{}) error } @@ -47,93 +37,35 @@ type msgAndResult struct { resultChan chan error } -type expressLaneService struct { - stopwaiter.StopWaiter - sync.RWMutex - transactionPublisher transactionPublisher - auctionContractAddr common.Address - apiBackend *arbitrum.APIBackend - roundTimingInfo timeboost.RoundTimingInfo - earlySubmissionGrace time.Duration - txQueueTimeout time.Duration - chainConfig *params.ChainConfig - logs chan []*types.Log - auctionContract *express_lane_auctiongen.ExpressLaneAuction - roundControl *lru.Cache[uint64, *expressLaneControl] // thread safe +type expressLaneRoundInfo struct { + sequence uint64 msgAndResultBySequenceNumber map[uint64]*msgAndResult } -type contractAdapter struct { - *filters.FilterAPI - bind.ContractTransactor // We leave this member unset as it is not used. - - apiBackend *arbitrum.APIBackend -} - -func (a *contractAdapter) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { - logPointers, err := a.GetLogs(ctx, filters.FilterCriteria(q)) - if err != nil { - return nil, err - } - logs := make([]types.Log, 0, len(logPointers)) - for _, log := range logPointers { - logs = append(logs, *log) - } - return logs, nil -} - -func (a *contractAdapter) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { - panic("contractAdapter doesn't implement SubscribeFilterLogs - shouldn't be needed") -} - -func (a *contractAdapter) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { - panic("contractAdapter doesn't implement CodeAt - shouldn't be needed") -} - -func (a *contractAdapter) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { - var num rpc.BlockNumber = rpc.LatestBlockNumber - if blockNumber != nil { - num = rpc.BlockNumber(blockNumber.Int64()) - } - - state, header, err := a.apiBackend.StateAndHeaderByNumber(ctx, num) - if err != nil { - return nil, err - } - - msg := &core.Message{ - From: call.From, - To: call.To, - Value: big.NewInt(0), - GasLimit: math.MaxUint64, - GasPrice: big.NewInt(0), - GasFeeCap: big.NewInt(0), - GasTipCap: big.NewInt(0), - Data: call.Data, - AccessList: call.AccessList, - SkipAccountChecks: true, - TxRunMode: core.MessageEthcallMode, // Indicate this is an eth_call - SkipL1Charging: true, // Skip L1 data fees - } - - evm := a.apiBackend.GetEVM(ctx, msg, state, header, &vm.Config{NoBaseFee: true}, nil) - gp := new(core.GasPool).AddGas(math.MaxUint64) - result, err := core.ApplyMessage(evm, msg, gp) - if err != nil { - return nil, err - } - - return result.ReturnData, nil +type expressLaneService struct { + stopwaiter.StopWaiter + transactionPublisher transactionPublisher + seqConfig SequencerConfigFetcher + auctionContractAddr common.Address + apiBackend *arbitrum.APIBackend + roundTimingInfo timeboost.RoundTimingInfo + earlySubmissionGrace time.Duration + chainConfig *params.ChainConfig + auctionContract *express_lane_auctiongen.ExpressLaneAuction + roundControl containers.SyncMap[uint64, common.Address] // thread safe + + roundInfoMutex sync.Mutex + roundInfo *containers.LruCache[uint64, *expressLaneRoundInfo] } func newExpressLaneService( transactionPublisher transactionPublisher, + seqConfig SequencerConfigFetcher, apiBackend *arbitrum.APIBackend, filterSystem *filters.FilterSystem, auctionContractAddr common.Address, bc *core.BlockChain, earlySubmissionGrace time.Duration, - txQueueTimeout time.Duration, ) (*expressLaneService, error) { chainConfig := bc.Config() @@ -165,233 +97,278 @@ pending: } return &expressLaneService{ - transactionPublisher: transactionPublisher, - auctionContract: auctionContract, - apiBackend: apiBackend, - chainConfig: chainConfig, - roundTimingInfo: *roundTimingInfo, - earlySubmissionGrace: earlySubmissionGrace, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), // Keep 8 rounds cached. - auctionContractAddr: auctionContractAddr, - logs: make(chan []*types.Log, 10_000), - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), - txQueueTimeout: txQueueTimeout, + transactionPublisher: transactionPublisher, + seqConfig: seqConfig, + auctionContract: auctionContract, + apiBackend: apiBackend, + chainConfig: chainConfig, + roundTimingInfo: *roundTimingInfo, + earlySubmissionGrace: earlySubmissionGrace, + auctionContractAddr: auctionContractAddr, + roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8), }, nil } func (es *expressLaneService) Start(ctxIn context.Context) { es.StopWaiter.Start(ctxIn, es) - // Log every new express lane auction round. es.LaunchThread(func(ctx context.Context) { + // Log every new express lane auction round. log.Info("Watching for new express lane rounds") - waitTime := es.roundTimingInfo.TimeTilNextRound() + // Wait until the next round starts + waitTime := es.roundTimingInfo.TimeTilNextRound() select { case <-ctx.Done(): return case <-time.After(waitTime): - // First tick happened, now set up regular ticks } + // First tick happened, now set up regular ticks ticker := time.NewTicker(es.roundTimingInfo.Round) defer ticker.Stop() - for { + var t time.Time select { case <-ctx.Done(): return - case t := <-ticker.C: - round := es.roundTimingInfo.RoundNumber() - // TODO (BUG?) is there a race here where messages for a new round can come - // in before this tick has been processed? - log.Info( - "New express lane auction round", - "round", round, - "timestamp", t, - ) - es.Lock() - // Reset the sequence numbers map for the new round. - es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult) - es.Unlock() + case t = <-ticker.C: } + + round := es.roundTimingInfo.RoundNumber() + // TODO (BUG?) is there a race here where messages for a new round can come + // in before this tick has been processed? + log.Info( + "New express lane auction round", + "round", round, + "timestamp", t, + ) + + // Cleanup previous round controller data + es.roundControl.Delete(round - 1) } }) + es.LaunchThread(func(ctx context.Context) { - log.Info("Monitoring express lane auction contract") // Monitor for auction resolutions from the auction manager smart contract // and set the express lane controller for the upcoming round accordingly. + log.Info("Monitoring express lane auction contract") + + var fromBlock uint64 + maxBlockSpeed := es.seqConfig().MaxBlockSpeed latestBlock, err := es.apiBackend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if err != nil { - // TODO: Should not be a crit. - log.Crit("Could not get latest header", "err", err) + log.Error("ExpressLaneService could not get the latest header", "err", err) + } else { + maxBlocksPerRound := es.roundTimingInfo.Round / maxBlockSpeed + fromBlock = latestBlock.Number.Uint64() + // #nosec G115 + if fromBlock > uint64(maxBlocksPerRound) { + // #nosec G115 + fromBlock -= uint64(maxBlocksPerRound) + } } - fromBlock := latestBlock.Number.Uint64() + + ticker := time.NewTicker(maxBlockSpeed) + defer ticker.Stop() for { select { case <-ctx.Done(): return - case <-time.After(time.Millisecond * 250): - latestBlock, err := es.apiBackend.HeaderByNumber(ctx, rpc.LatestBlockNumber) - if err != nil { - log.Crit("Could not get latest header", "err", err) + case <-ticker.C: + newMaxBlockSpeed := es.seqConfig().MaxBlockSpeed + if newMaxBlockSpeed != maxBlockSpeed { + maxBlockSpeed = newMaxBlockSpeed + ticker.Reset(maxBlockSpeed) } - toBlock := latestBlock.Number.Uint64() - if fromBlock > toBlock { + } + + latestBlock, err := es.apiBackend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + if err != nil { + log.Error("ExpressLaneService could not get the latest header", "err", err) + continue + } + toBlock := latestBlock.Number.Uint64() + if fromBlock > toBlock { + continue + } + filterOpts := &bind.FilterOpts{ + Context: ctx, + Start: fromBlock, + End: &toBlock, + } + + it, err := es.auctionContract.FilterAuctionResolved(filterOpts, nil, nil, nil) + if err != nil { + log.Error("Could not filter auction resolutions event", "error", err) + continue + } + for it.Next() { + log.Info( + "AuctionResolved: New express lane controller assigned", + "round", it.Event.Round, + "controller", it.Event.FirstPriceExpressLaneController, + ) + es.roundControl.Store(it.Event.Round, it.Event.FirstPriceExpressLaneController) + } + + setExpressLaneIterator, err := es.auctionContract.FilterSetExpressLaneController(filterOpts, nil, nil, nil) + if err != nil { + log.Error("Could not filter express lane controller transfer event", "error", err) + continue + } + for setExpressLaneIterator.Next() { + if (setExpressLaneIterator.Event.PreviousExpressLaneController == common.Address{}) { + // The ExpressLaneAuction contract emits both AuctionResolved and SetExpressLaneController + // events when an auction is resolved. They contain redundant information so + // the SetExpressLaneController event can be skipped if it's related to a new round, as + // indicated by an empty PreviousExpressLaneController field (a new round has no + // previous controller). + // It is more explicit and thus clearer to use the AuctionResovled event only for the + // new round setup logic and SetExpressLaneController event only for transfers, rather + // than trying to overload everything onto SetExpressLaneController. continue } - filterOpts := &bind.FilterOpts{ - Context: ctx, - Start: fromBlock, - End: &toBlock, + currentRound := es.roundTimingInfo.RoundNumber() + round := setExpressLaneIterator.Event.Round + if round < currentRound { + log.Info("SetExpressLaneController event's round is lower than current round, not transferring control", "eventRound", round, "currentRound", currentRound) + continue } - it, err := es.auctionContract.FilterAuctionResolved(filterOpts, nil, nil, nil) - if err != nil { - log.Error("Could not filter auction resolutions event", "error", err) + roundController, ok := es.roundControl.Load(round) + if !ok { + log.Warn("Could not find round info for ExpressLaneConroller transfer event", "round", round) continue } - for it.Next() { - log.Info( - "AuctionResolved: New express lane controller assigned", - "round", it.Event.Round, - "controller", it.Event.FirstPriceExpressLaneController, - ) - es.roundControl.Add(it.Event.Round, &expressLaneControl{ - controller: it.Event.FirstPriceExpressLaneController, - sequence: 0, - }) + if roundController != setExpressLaneIterator.Event.PreviousExpressLaneController { + log.Warn("Previous ExpressLaneController in SetExpressLaneController event does not match Sequencer previous controller, continuing with transfer to new controller anyway", + "round", round, + "sequencerRoundController", roundController, + "previous", setExpressLaneIterator.Event.PreviousExpressLaneController, + "new", setExpressLaneIterator.Event.NewExpressLaneController) } - - setExpressLaneIterator, err := es.auctionContract.FilterSetExpressLaneController(filterOpts, nil, nil, nil) - if err != nil { - log.Error("Could not filter express lane controller transfer event", "error", err) + if roundController == setExpressLaneIterator.Event.NewExpressLaneController { + log.Warn("SetExpressLaneController: Previous and New ExpressLaneControllers are the same, not transferring control.", + "round", round, + "previous", roundController, + "new", setExpressLaneIterator.Event.NewExpressLaneController) continue } - - for setExpressLaneIterator.Next() { - if (setExpressLaneIterator.Event.PreviousExpressLaneController == common.Address{}) { - // The ExpressLaneAuction contract emits both AuctionResolved and SetExpressLaneController - // events when an auction is resolved. They contain redundant information so - // the SetExpressLaneController event can be skipped if it's related to a new round, as - // indicated by an empty PreviousExpressLaneController field (a new round has no - // previous controller). - // It is more explicit and thus clearer to use the AuctionResovled event only for the - // new round setup logic and SetExpressLaneController event only for transfers, rather - // than trying to overload everything onto SetExpressLaneController. - continue - } - round := setExpressLaneIterator.Event.Round - roundInfo, ok := es.roundControl.Get(round) - if !ok { - log.Warn("Could not find round info for ExpressLaneConroller transfer event", "round", round) - continue + es.roundControl.Store(round, setExpressLaneIterator.Event.NewExpressLaneController) + if round == currentRound { + es.roundInfoMutex.Lock() + if es.roundInfo.Contains(round) { + es.roundInfo.Add(round, &expressLaneRoundInfo{ + 0, + make(map[uint64]*msgAndResult), + }) } - if roundInfo.controller != setExpressLaneIterator.Event.PreviousExpressLaneController { - log.Warn("Previous ExpressLaneController in SetExpressLaneController event does not match Sequencer previous controller, continuing with transfer to new controller anyway", - "round", round, - "sequencerRoundController", roundInfo.controller, - "previous", setExpressLaneIterator.Event.PreviousExpressLaneController, - "new", setExpressLaneIterator.Event.NewExpressLaneController) - } - if roundInfo.controller == setExpressLaneIterator.Event.NewExpressLaneController { - log.Warn("SetExpressLaneController: Previous and New ExpressLaneControllers are the same, not transferring control.", - "round", round, - "previous", roundInfo.controller, - "new", setExpressLaneIterator.Event.NewExpressLaneController) - continue - } - - es.Lock() - // Changes to roundControl by itself are atomic but we need to udpate both roundControl - // and msgAndResultBySequenceNumber atomically here. - es.roundControl.Add(round, &expressLaneControl{ - controller: setExpressLaneIterator.Event.NewExpressLaneController, - sequence: 0, - }) - // Since the sequence number for this round has been reset to zero, the map of messages - // by sequence number must be reset otherwise old messages would be replayed. - es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult) - es.Unlock() + es.roundInfoMutex.Unlock() } - fromBlock = toBlock + 1 } + fromBlock = toBlock + 1 } }) } func (es *expressLaneService) currentRoundHasController() bool { - control, ok := es.roundControl.Get(es.roundTimingInfo.RoundNumber()) + controller, ok := es.roundControl.Load(es.roundTimingInfo.RoundNumber()) if !ok { return false } - return control.controller != (common.Address{}) + return controller != (common.Address{}) } -// Sequence express lane submission skips validation of the express lane message itself, -// as the core validator logic is handled in `validateExpressLaneTx“ +// sequenceExpressLaneSubmission with the roundInfo lock held, validates sequence number and sender address fields of the message +// adds the message to the transaction queue and waits for the response func (es *expressLaneService) sequenceExpressLaneSubmission( ctx context.Context, msg *timeboost.ExpressLaneSubmission, ) error { unlockByDefer := true - es.Lock() + es.roundInfoMutex.Lock() defer func() { if unlockByDefer { - es.Unlock() + es.roundInfoMutex.Unlock() } }() - // Although access to roundControl by itself is thread-safe, when the round control is transferred - // we need to reset roundControl and msgAndResultBySequenceNumber atomically, so the following access - // must be within the lock. - control, ok := es.roundControl.Get(msg.Round) + + // Below code block isn't a repetition, it prevents stale messages to be accepted during control transfer within or after the round ends! + controller, ok := es.roundControl.Load(msg.Round) if !ok { return timeboost.ErrNoOnchainController } + sender, err := msg.Sender() // Doesn't recompute sender address + if err != nil { + return err + } + if sender != controller { + return timeboost.ErrNotExpressLaneController + } + + // If expressLaneRoundInfo for current round doesn't exist yet, we'll add it to the cache + if !es.roundInfo.Contains(msg.Round) { + es.roundInfo.Add(msg.Round, &expressLaneRoundInfo{ + 0, + make(map[uint64]*msgAndResult), + }) + } + roundInfo, _ := es.roundInfo.Get(msg.Round) // Check if the submission nonce is too low. - if msg.SequenceNumber < control.sequence { + if msg.SequenceNumber < roundInfo.sequence { return timeboost.ErrSequenceNumberTooLow } + // Check if a duplicate submission exists already, and reject if so. - if _, exists := es.msgAndResultBySequenceNumber[msg.SequenceNumber]; exists { + if _, exists := roundInfo.msgAndResultBySequenceNumber[msg.SequenceNumber]; exists { return timeboost.ErrDuplicateSequenceNumber } + + seqConfig := es.seqConfig() + // Log an informational warning if the message's sequence number is in the future. - if msg.SequenceNumber > control.sequence { + if msg.SequenceNumber > roundInfo.sequence { + if seqConfig.Timeboost.MaxQueuedTxCount != 0 && + len(roundInfo.msgAndResultBySequenceNumber) >= seqConfig.Timeboost.MaxQueuedTxCount { + return fmt.Errorf("reached limit for queuing of future sequence number transactions, please try again with the correct sequence number. Limit: %d, Current sequence number: %d", seqConfig.Timeboost.MaxQueuedTxCount, roundInfo.sequence) + } log.Info("Received express lane submission with future sequence number", "SequenceNumber", msg.SequenceNumber) } + // Put into the sequence number map. resultChan := make(chan error, 1) - es.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan} + roundInfo.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan} now := time.Now() for es.roundTimingInfo.RoundNumber() == msg.Round { // This check ensures that the controller for this round is not allowed to send transactions from msgAndResultBySequenceNumber map once the next round starts // Get the next message in the sequence. - nextMsgAndResult, exists := es.msgAndResultBySequenceNumber[control.sequence] + nextMsgAndResult, exists := roundInfo.msgAndResultBySequenceNumber[roundInfo.sequence] if !exists { break } - delete(es.msgAndResultBySequenceNumber, nextMsgAndResult.msg.SequenceNumber) + delete(roundInfo.msgAndResultBySequenceNumber, nextMsgAndResult.msg.SequenceNumber) txIsQueued := make(chan struct{}) es.LaunchThread(func(ctx context.Context) { nextMsgAndResult.resultChan <- es.transactionPublisher.PublishTimeboostedTransaction(ctx, nextMsgAndResult.msg.Transaction, nextMsgAndResult.msg.Options, txIsQueued) }) <-txIsQueued // Increase the global round sequence number. - control.sequence += 1 + roundInfo.sequence += 1 } - es.roundControl.Add(msg.Round, control) + + es.roundInfo.Add(msg.Round, roundInfo) unlockByDefer = false - es.Unlock() // Release lock so that other timeboost txs can be processed + es.roundInfoMutex.Unlock() // Release lock so that other timeboost txs can be processed - abortCtx, cancel := ctxWithTimeout(ctx, es.txQueueTimeout*2) // We use the same timeout value that sequencer imposes + queueTimeout := seqConfig.QueueTimeout + abortCtx, cancel := ctxWithTimeout(ctx, queueTimeout*2) // We use the same timeout value that sequencer imposes defer cancel() - var err error select { case err = <-resultChan: case <-abortCtx.Done(): if ctx.Err() == nil { - log.Warn("Transaction sequencing hit abort deadline", "err", abortCtx.Err(), "submittedAt", now, "TxProcessingTimeout", es.txQueueTimeout*2, "txHash", msg.Transaction.Hash()) + log.Warn("Transaction sequencing hit abort deadline", "err", abortCtx.Err(), "submittedAt", now, "TxProcessingTimeout", queueTimeout*2, "txHash", msg.Transaction.Hash()) } err = fmt.Errorf("Transaction sequencing hit timeout, result for the submitted transaction is not yet available: %w", abortCtx.Err()) } @@ -402,6 +379,7 @@ func (es *expressLaneService) sequenceExpressLaneSubmission( return nil } +// validateExpressLaneTx checks for the correctness of all fields of msg func (es *expressLaneService) validateExpressLaneTx(msg *timeboost.ExpressLaneSubmission) error { if msg == nil || msg.Transaction == nil || msg.Signature == nil { return timeboost.ErrMalformedData @@ -413,12 +391,8 @@ func (es *expressLaneService) validateExpressLaneTx(msg *timeboost.ExpressLaneSu return errors.Wrapf(timeboost.ErrWrongAuctionContract, "msg auction contract address %s does not match sequencer auction contract address %s", msg.AuctionContractAddress, es.auctionContractAddr) } - for { - currentRound := es.roundTimingInfo.RoundNumber() - if msg.Round == currentRound { - break - } - + currentRound := es.roundTimingInfo.RoundNumber() + if msg.Round != currentRound { timeTilNextRound := es.roundTimingInfo.TimeTilNextRound() // We allow txs to come in for the next round if it is close enough to that round, // but we sleep until the round starts. @@ -428,38 +402,17 @@ func (es *expressLaneService) validateExpressLaneTx(msg *timeboost.ExpressLaneSu return errors.Wrapf(timeboost.ErrBadRoundNumber, "express lane tx round %d does not match current round %d", msg.Round, currentRound) } } - if !es.currentRoundHasController() { + + controller, ok := es.roundControl.Load(msg.Round) + if !ok { return timeboost.ErrNoOnchainController } - // Reconstruct the message being signed over and recover the sender address. - signingMessage, err := msg.ToMessageBytes() + // Extract sender address and cache it to be later used by sequenceExpressLaneSubmission + sender, err := msg.Sender() if err != nil { - return timeboost.ErrMalformedData - } - if len(msg.Signature) != 65 { - return errors.Wrap(timeboost.ErrMalformedData, "signature length is not 65") - } - // Recover the public key. - prefixed := crypto.Keccak256(append([]byte(fmt.Sprintf("\x19Ethereum Signed Message:\n%d", len(signingMessage))), signingMessage...)) - sigItem := make([]byte, len(msg.Signature)) - copy(sigItem, msg.Signature) - - // Signature verification expects the last byte of the signature to have 27 subtracted, - // as it represents the recovery ID. If the last byte is greater than or equal to 27, it indicates a recovery ID that hasn't been adjusted yet, - // it's needed for internal signature verification logic. - if sigItem[len(sigItem)-1] >= 27 { - sigItem[len(sigItem)-1] -= 27 - } - pubkey, err := crypto.SigToPub(prefixed, sigItem) - if err != nil { - return timeboost.ErrMalformedData - } - sender := crypto.PubkeyToAddress(*pubkey) - control, ok := es.roundControl.Get(msg.Round) - if !ok { - return timeboost.ErrNoOnchainController + return err } - if sender != control.controller { + if sender != controller { return timeboost.ErrNotExpressLaneController } return nil diff --git a/execution/gethexec/express_lane_service_test.go b/execution/gethexec/express_lane_service_test.go index 0c52272322..97de8a1d36 100644 --- a/execution/gethexec/express_lane_service_test.go +++ b/execution/gethexec/express_lane_service_test.go @@ -17,12 +17,12 @@ import ( "github.com/ethereum/go-ethereum/arbitrum_types" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/offchainlabs/nitro/timeboost" + "github.com/offchainlabs/nitro/util/containers" ) var testPriv, testPriv2 *ecdsa.PrivateKey @@ -55,23 +55,19 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { es *expressLaneService sub *timeboost.ExpressLaneSubmission expectedErr error - control expressLaneControl + controller common.Address valid bool }{ { - name: "nil msg", - sub: nil, - es: &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, + name: "nil msg", + sub: nil, + es: &expressLaneService{}, expectedErr: timeboost.ErrMalformedData, }, { - name: "nil tx", - sub: &timeboost.ExpressLaneSubmission{}, - es: &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, + name: "nil tx", + sub: &timeboost.ExpressLaneSubmission{}, + es: &expressLaneService{}, expectedErr: timeboost.ErrMalformedData, }, { @@ -79,9 +75,7 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { sub: &timeboost.ExpressLaneSubmission{ Transaction: &types.Transaction{}, }, - es: &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, + es: &expressLaneService{}, expectedErr: timeboost.ErrMalformedData, }, { @@ -90,7 +84,6 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), }, sub: &timeboost.ExpressLaneSubmission{ ChainId: big.NewInt(2), @@ -106,7 +99,6 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), }, sub: &timeboost.ExpressLaneSubmission{ ChainId: big.NewInt(1), @@ -116,24 +108,6 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { }, expectedErr: timeboost.ErrWrongAuctionContract, }, - { - name: "no onchain controller", - es: &expressLaneService{ - auctionContractAddr: common.Address{'a'}, - roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), - chainConfig: ¶ms.ChainConfig{ - ChainID: big.NewInt(1), - }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - sub: &timeboost.ExpressLaneSubmission{ - ChainId: big.NewInt(1), - AuctionContractAddress: common.Address{'a'}, - Transaction: &types.Transaction{}, - Signature: []byte{'b'}, - }, - expectedErr: timeboost.ErrNoOnchainController, - }, { name: "bad round number", es: &expressLaneService{ @@ -142,11 +116,8 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: common.Address{'b'}, }, + controller: common.Address{'b'}, sub: &timeboost.ExpressLaneSubmission{ ChainId: big.NewInt(1), AuctionContractAddress: common.Address{'a'}, @@ -164,11 +135,9 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: common.Address{'b'}, }, + controller: common.Address{'b'}, + sub: &timeboost.ExpressLaneSubmission{ ChainId: big.NewInt(1), AuctionContractAddress: common.Address{'a'}, @@ -186,14 +155,29 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: common.Address{'b'}, + roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8), }, + controller: common.Address{'b'}, sub: buildInvalidSignatureSubmission(t, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6")), expectedErr: timeboost.ErrNotExpressLaneController, }, + { + name: "no onchain controller", + es: &expressLaneService{ + auctionContractAddr: common.Address{'a'}, + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + chainConfig: ¶ms.ChainConfig{ + ChainID: big.NewInt(1), + }, + }, + sub: &timeboost.ExpressLaneSubmission{ + ChainId: big.NewInt(1), + AuctionContractAddress: common.Address{'a'}, + Transaction: &types.Transaction{}, + Signature: []byte{'b'}, + }, + expectedErr: timeboost.ErrNoOnchainController, + }, { name: "not express lane controller", es: &expressLaneService{ @@ -202,11 +186,9 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: common.Address{'b'}, + roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8), }, + controller: common.Address{'b'}, sub: buildValidSubmission(t, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), testPriv, 0), expectedErr: timeboost.ErrNotExpressLaneController, }, @@ -218,21 +200,21 @@ func Test_expressLaneService_validateExpressLaneTx(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - }, - control: expressLaneControl{ - controller: crypto.PubkeyToAddress(testPriv.PublicKey), }, - sub: buildValidSubmission(t, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), testPriv, 0), - valid: true, + controller: crypto.PubkeyToAddress(testPriv.PublicKey), + sub: buildValidSubmission(t, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), testPriv, 0), + valid: true, }, } for _, _tt := range tests { tt := _tt t.Run(tt.name, func(t *testing.T) { - if tt.sub != nil { - tt.es.roundControl.Add(tt.sub.Round, &tt.control) + if tt.es.roundInfo != nil { + tt.es.roundInfo.Add(0, &expressLaneRoundInfo{}) + } + if tt.sub != nil && !errors.Is(tt.expectedErr, timeboost.ErrNoOnchainController) { + tt.es.roundControl.Store(tt.sub.Round, tt.controller) } err := tt.es.validateExpressLaneTx(tt.sub) if tt.valid { @@ -257,14 +239,9 @@ func Test_expressLaneService_validateExpressLaneTx_gracePeriod(t *testing.T) { chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), } - es.roundControl.Add(0, &expressLaneControl{ - controller: crypto.PubkeyToAddress(testPriv.PublicKey), - }) - es.roundControl.Add(1, &expressLaneControl{ - controller: crypto.PubkeyToAddress(testPriv2.PublicKey), - }) + es.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) + es.roundControl.Store(1, crypto.PubkeyToAddress(testPriv2.PublicKey)) sub1 := buildValidSubmission(t, auctionContractAddr, testPriv, 0) err := es.validateExpressLaneTx(sub1) @@ -305,8 +282,7 @@ func (s *stubPublisher) PublishTimeboostedTransaction(parentCtx context.Context, if tx.Hash() != emptyTx.Hash() { return errors.New("oops, bad tx") } - control, _ := s.els.roundControl.Get(0) - s.publishedTxOrder = append(s.publishedTxOrder, control.sequence) + s.publishedTxOrder = append(s.publishedTxOrder, 0) return nil } @@ -315,19 +291,15 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_nonceTooLow(t *testin ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), - roundControl: lru.NewCache[uint64, *expressLaneControl](8), + roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8), } + els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)}) els.StopWaiter.Start(ctx, els) + els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher - els.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - }) - msg := &timeboost.ExpressLaneSubmission{ - SequenceNumber: 0, - } + msg := buildValidSubmissionWithSeqAndTx(t, 0, 0, emptyTx) err := els.sequenceExpressLaneSubmission(ctx, msg) require.ErrorIs(t, err, timeboost.ErrSequenceNumberTooLow) } @@ -336,20 +308,17 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), - roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8), + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + seqConfig: func() *SequencerConfig { return &SequencerConfig{} }, } + els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)}) els.StopWaiter.Start(ctx, els) + els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher - els.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - }) - msg := &timeboost.ExpressLaneSubmission{ - SequenceNumber: 2, - Transaction: types.NewTx(&types.DynamicFeeTx{Data: []byte{1}}), - } + + msg := buildValidSubmissionWithSeqAndTx(t, 0, 2, types.NewTx(&types.DynamicFeeTx{Data: []byte{1}})) var wg sync.WaitGroup wg.Add(3) // We expect only of the below two to return with an error here var err1, err2 error @@ -379,40 +348,24 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), - roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8), + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + seqConfig: func() *SequencerConfig { return &SequencerConfig{} }, } + els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)}) els.StopWaiter.Start(ctx, els) + els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher - els.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - }) - messages := []*timeboost.ExpressLaneSubmission{ - { - SequenceNumber: 10, - Transaction: types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1}), - }, - { - SequenceNumber: 5, - Transaction: emptyTx, - }, - { - SequenceNumber: 1, - Transaction: emptyTx, - }, - { - SequenceNumber: 4, - Transaction: emptyTx, - }, - { - SequenceNumber: 2, - Transaction: emptyTx, - }, + buildValidSubmissionWithSeqAndTx(t, 0, 10, types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1})), + buildValidSubmissionWithSeqAndTx(t, 0, 5, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 1, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 4, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 2, emptyTx), } + // We launch 5 goroutines out of which 2 would return with a result hence we initially add a delta of 7 var wg sync.WaitGroup wg.Add(7) @@ -431,53 +384,42 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing // We should have only published 2, as we are missing sequence number 3. time.Sleep(2 * time.Second) require.Equal(t, 2, len(stubPublisher.publishedTxOrder)) - els.Lock() - require.Equal(t, 3, len(els.msgAndResultBySequenceNumber)) // Processed txs are deleted - els.Unlock() + els.roundInfoMutex.Lock() + roundInfo, _ := els.roundInfo.Get(0) + require.Equal(t, 3, len(roundInfo.msgAndResultBySequenceNumber)) // Processed txs are deleted + els.roundInfoMutex.Unlock() wg.Add(2) // 4 & 5 should be able to get in after 3 so we add a delta of 2 - err := els.sequenceExpressLaneSubmission(ctx, &timeboost.ExpressLaneSubmission{SequenceNumber: 3, Transaction: emptyTx}) + err := els.sequenceExpressLaneSubmission(ctx, buildValidSubmissionWithSeqAndTx(t, 0, 3, emptyTx)) require.NoError(t, err) wg.Wait() require.Equal(t, 5, len(stubPublisher.publishedTxOrder)) - els.Lock() - require.Equal(t, 1, len(els.msgAndResultBySequenceNumber)) // Tx with seq num 10 should still be present - els.Unlock() + els.roundInfoMutex.Lock() + roundInfo, _ = els.roundInfo.Get(0) + require.Equal(t, 1, len(roundInfo.msgAndResultBySequenceNumber)) // Tx with seq num 10 should still be present + els.roundInfoMutex.Unlock() } func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), - roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8), + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), + seqConfig: func() *SequencerConfig { return &SequencerConfig{} }, } + els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)}) els.StopWaiter.Start(ctx, els) - els.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - }) + els.roundControl.Store(0, crypto.PubkeyToAddress(testPriv.PublicKey)) stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher messages := []*timeboost.ExpressLaneSubmission{ - { - SequenceNumber: 1, - Transaction: emptyTx, - }, - { - SequenceNumber: 2, - Transaction: types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1}), - }, - { - SequenceNumber: 3, - Transaction: emptyTx, - }, - { - SequenceNumber: 4, - Transaction: emptyTx, - }, + buildValidSubmissionWithSeqAndTx(t, 0, 1, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 2, types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1})), + buildValidSubmissionWithSeqAndTx(t, 0, 3, emptyTx), + buildValidSubmissionWithSeqAndTx(t, 0, 4, emptyTx), } for _, msg := range messages { if msg.Transaction.Hash() != emptyTx.Hash() { @@ -488,13 +430,12 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing. require.NoError(t, err) } } + // One tx out of the four should have failed, so we should have only published 3. // Since sequence number 2 failed after submission stage, that nonce is used up require.Equal(t, 3, len(stubPublisher.publishedTxOrder)) - require.Equal(t, []uint64{1, 3, 4}, stubPublisher.publishedTxOrder) } -// TODO this test is just for RoundTimingInfo func TestIsWithinAuctionCloseWindow(t *testing.T) { initialTimestamp := time.Date(2024, 8, 8, 15, 0, 0, 0, time.UTC) roundTimingInfo := defaultTestRoundTimingInfo(initialTimestamp) @@ -547,15 +488,14 @@ func Benchmark_expressLaneService_validateExpressLaneTx(b *testing.B) { es := &expressLaneService{ auctionContractAddr: common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), - roundControl: lru.NewCache[uint64, *expressLaneControl](8), + roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8), chainConfig: ¶ms.ChainConfig{ ChainID: big.NewInt(1), }, } - es.roundControl.Add(0, &expressLaneControl{ - sequence: 1, - controller: addr, - }) + es.roundControl.Store(0, addr) + es.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)}) + sub := buildValidSubmission(b, common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), testPriv, 0) b.StartTimer() for i := 0; i < b.N; i++ { @@ -621,3 +561,25 @@ func buildValidSubmission( b.Signature = signature return b } + +func buildValidSubmissionWithSeqAndTx( + t testing.TB, + round uint64, + seq uint64, + tx *types.Transaction, +) *timeboost.ExpressLaneSubmission { + b := &timeboost.ExpressLaneSubmission{ + ChainId: big.NewInt(1), + AuctionContractAddress: common.HexToAddress("0x2Aef36410182881a4b13664a1E079762D7F716e6"), + Transaction: tx, + Signature: make([]byte, 65), + Round: round, + SequenceNumber: seq, + } + data, err := b.ToMessageBytes() + require.NoError(t, err) + signature, err := buildSignature(testPriv, data) + require.NoError(t, err) + b.Signature = signature + return b +} diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index babb03058f..890fa80de2 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -361,6 +361,19 @@ func (n *ExecutionNode) Initialize(ctx context.Context) error { if err != nil { return fmt.Errorf("error setting sync backend: %w", err) } + if config.Sequencer.Enable && config.Sequencer.Timeboost.Enable { + err := n.Sequencer.InitializeExpressLaneService( + n.Backend.APIBackend(), + n.FilterSystem, + common.HexToAddress(config.Sequencer.Timeboost.AuctionContractAddress), + common.HexToAddress(config.Sequencer.Timeboost.AuctioneerAddress), + config.Sequencer.Timeboost.EarlySubmissionGrace, + ) + if err != nil { + return fmt.Errorf("failed to create express lane service. err: %w", err) + } + } + return nil } diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index aba8faae33..4c35277463 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -91,6 +91,7 @@ type TimeboostConfig struct { ExpressLaneAdvantage time.Duration `koanf:"express-lane-advantage"` SequencerHTTPEndpoint string `koanf:"sequencer-http-endpoint"` EarlySubmissionGrace time.Duration `koanf:"early-submission-grace"` + MaxQueuedTxCount int `koanf:"max-queued-tx-count"` } var DefaultTimeboostConfig = TimeboostConfig{ @@ -100,6 +101,7 @@ var DefaultTimeboostConfig = TimeboostConfig{ ExpressLaneAdvantage: time.Millisecond * 200, SequencerHTTPEndpoint: "http://localhost:8547", EarlySubmissionGrace: time.Second * 2, + MaxQueuedTxCount: 10, } func (c *SequencerConfig) Validate() error { @@ -194,6 +196,7 @@ func TimeboostAddOptions(prefix string, f *flag.FlagSet) { f.Duration(prefix+".express-lane-advantage", DefaultTimeboostConfig.ExpressLaneAdvantage, "specify the express lane advantage") f.String(prefix+".sequencer-http-endpoint", DefaultTimeboostConfig.SequencerHTTPEndpoint, "this sequencer's http endpoint") f.Duration(prefix+".early-submission-grace", DefaultTimeboostConfig.EarlySubmissionGrace, "period of time before the next round where submissions for the next round will be queued") + f.Int(prefix+".max-queued-tx-count", DefaultTimeboostConfig.MaxQueuedTxCount, "maximum allowed number of express lane txs with future sequence number to be queued. Set 0 to disable this check and a negative value to prevent queuing of any future sequence number transactions") } type txQueueItem struct { @@ -1190,6 +1193,30 @@ func (s *Sequencer) Initialize(ctx context.Context) error { return nil } +func (s *Sequencer) InitializeExpressLaneService( + apiBackend *arbitrum.APIBackend, + filterSystem *filters.FilterSystem, + auctionContractAddr common.Address, + auctioneerAddr common.Address, + earlySubmissionGrace time.Duration, +) error { + els, err := newExpressLaneService( + s, + s.config, + apiBackend, + filterSystem, + auctionContractAddr, + s.execEngine.bc, + earlySubmissionGrace, + ) + if err != nil { + return fmt.Errorf("failed to create express lane service. auctionContractAddr: %v err: %w", auctionContractAddr, err) + } + s.auctioneerAddr = auctioneerAddr + s.expressLaneService = els + return nil +} + var ( usableBytesInBlob = big.NewInt(int64(len(kzg4844.Blob{}) * 31 / 32)) blobTxBlobGasPerBlob = big.NewInt(params.BlobTxBlobGasPerBlob) @@ -1235,6 +1262,12 @@ func (s *Sequencer) updateExpectedSurplus(ctx context.Context) (int64, error) { return expectedSurplus, nil } +func (s *Sequencer) StartExpressLaneService(ctx context.Context) { + if s.expressLaneService != nil { + s.expressLaneService.Start(ctx) + } +} + func (s *Sequencer) Start(ctxIn context.Context) error { s.StopWaiter.Start(ctxIn, s) config := s.config() @@ -1300,36 +1333,9 @@ func (s *Sequencer) Start(ctxIn context.Context) error { return 0 }) - return nil -} - -func (s *Sequencer) StartExpressLane( - ctx context.Context, - apiBackend *arbitrum.APIBackend, - filterSystem *filters.FilterSystem, - auctionContractAddr common.Address, - auctioneerAddr common.Address, - earlySubmissionGrace time.Duration, -) { - if !s.config().Timeboost.Enable { - log.Crit("Timeboost is not enabled, but StartExpressLane was called") - } + s.StartExpressLaneService(ctxIn) - els, err := newExpressLaneService( - s, - apiBackend, - filterSystem, - auctionContractAddr, - s.execEngine.bc, - earlySubmissionGrace, - s.config().QueueTimeout, - ) - if err != nil { - log.Crit("Failed to create express lane service", "err", err, "auctionContractAddr", auctionContractAddr) - } - s.auctioneerAddr = auctioneerAddr - s.expressLaneService = els - s.expressLaneService.Start(ctx) + return nil } func (s *Sequencer) StopAndWait() { diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 05b4a773dc..edab8a10f0 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -1253,7 +1253,9 @@ func setupExpressLaneAuction( // This is hacky- we are manually starting the ExpressLaneService here instead of letting it be started // by the sequencer. This is due to needing to deploy the auction contract first. builderSeq.execConfig.Sequencer.Timeboost.Enable = true - builderSeq.L2.ExecNode.Sequencer.StartExpressLane(ctx, builderSeq.L2.ExecNode.Backend.APIBackend(), builderSeq.L2.ExecNode.FilterSystem, proxyAddr, seqInfo.GetAddress("AuctionContract"), gethexec.DefaultTimeboostConfig.EarlySubmissionGrace) + err = builderSeq.L2.ExecNode.Sequencer.InitializeExpressLaneService(builderSeq.L2.ExecNode.Backend.APIBackend(), builderSeq.L2.ExecNode.FilterSystem, proxyAddr, seqInfo.GetAddress("AuctionContract"), gethexec.DefaultTimeboostConfig.EarlySubmissionGrace) + Require(t, err) + builderSeq.L2.ExecNode.Sequencer.StartExpressLaneService(ctx) t.Log("Started express lane service in sequencer") // Set up an autonomous auction contract service that runs in the background in this test. diff --git a/timeboost/types.go b/timeboost/types.go index 73e2e0d2b6..01a60b8484 100644 --- a/timeboost/types.go +++ b/timeboost/types.go @@ -3,8 +3,11 @@ package timeboost import ( "bytes" "encoding/binary" + "fmt" "math/big" + "github.com/pkg/errors" + "github.com/ethereum/go-ethereum/arbitrum_types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -176,6 +179,8 @@ type ExpressLaneSubmission struct { Options *arbitrum_types.ConditionalOptions `json:"options"` SequenceNumber uint64 Signature []byte + + sender common.Address } func JsonSubmissionToGo(submission *JsonExpressLaneSubmission) (*ExpressLaneSubmission, error) { @@ -229,6 +234,36 @@ func (els *ExpressLaneSubmission) ToMessageBytes() ([]byte, error) { return buf.Bytes(), nil } +func (els *ExpressLaneSubmission) Sender() (common.Address, error) { + if (els.sender != common.Address{}) { + return els.sender, nil + } + // Reconstruct the message being signed over and recover the sender address. + signingMessage, err := els.ToMessageBytes() + if err != nil { + return common.Address{}, ErrMalformedData + } + if len(els.Signature) != 65 { + return common.Address{}, errors.Wrap(ErrMalformedData, "signature length is not 65") + } + // Recover the public key. + prefixed := crypto.Keccak256(append([]byte(fmt.Sprintf("\x19Ethereum Signed Message:\n%d", len(signingMessage))), signingMessage...)) + sigItem := make([]byte, len(els.Signature)) + copy(sigItem, els.Signature) + // Signature verification expects the last byte of the signature to have 27 subtracted, + // as it represents the recovery ID. If the last byte is greater than or equal to 27, it indicates a recovery ID that hasn't been adjusted yet, + // it's needed for internal signature verification logic. + if sigItem[len(sigItem)-1] >= 27 { + sigItem[len(sigItem)-1] -= 27 + } + pubkey, err := crypto.SigToPub(prefixed, sigItem) + if err != nil { + return common.Address{}, ErrMalformedData + } + els.sender = crypto.PubkeyToAddress(*pubkey) + return els.sender, nil +} + // Helper function to pad a big integer to 32 bytes func padBigInt(bi *big.Int) []byte { bb := bi.Bytes()