From bf02dac460d3e72e3b303a9a0010b370cb090c12 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Wed, 20 Nov 2024 17:58:59 +0530 Subject: [PATCH 01/22] L1DepositStore - wip --- admin/admin.go | 55 ++++++++++++++++++++++++- admin/admin_test.go | 4 +- opsimulator/deposits.go | 3 +- opsimulator/deposits_test.go | 3 +- opsimulator/opsimulator.go | 13 +++++- opsimulator/store.go | 63 +++++++++++++++++++++++++++++ opsimulator/store_test.go | 28 +++++++++++++ orchestrator/orchestrator.go | 6 ++- supersim_test.go | 78 ++++++++++++++++++++++++++++++++++++ 9 files changed, 243 insertions(+), 10 deletions(-) create mode 100644 opsimulator/store.go create mode 100644 opsimulator/store_test.go diff --git a/admin/admin.go b/admin/admin.go index abe678757..977a3cd3f 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum-optimism/supersim/config" "github.com/ethereum-optimism/supersim/interop" + "github.com/ethereum-optimism/supersim/opsimulator" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" @@ -26,6 +27,7 @@ type AdminServer struct { networkConfig *config.NetworkConfig l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer + l1DepositStore *opsimulator.L1DepositStoreManager port uint64 } @@ -34,6 +36,7 @@ type RPCMethods struct { log log.Logger networkConfig *config.NetworkConfig l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer + l1DepositStore *opsimulator.L1DepositStoreManager } type JSONRPCError struct { @@ -50,6 +53,17 @@ type JSONL2ToL2Message struct { Message hexutil.Bytes `json:"Message"` } +type JSONDepositTx struct { + SourceHash common.Hash `json:"SourceHash"` + From common.Address `json:"From"` + To *common.Address `json:"To"` + Mint *big.Int `json:"Mint"` + Value *big.Int `json:"Value"` + Gas uint64 `json:"Gas"` + IsSystemTransaction bool `json:"IsSystemTransaction"` + Data hexutil.Bytes `json:"Data"` +} + func (e *JSONRPCError) Error() string { return e.Message } @@ -58,9 +72,9 @@ func (err *JSONRPCError) ErrorCode() int { return err.Code } -func NewAdminServer(log log.Logger, port uint64, networkConfig *config.NetworkConfig, indexer *interop.L2ToL2MessageIndexer) *AdminServer { +func NewAdminServer(log log.Logger, port uint64, networkConfig *config.NetworkConfig, indexer *interop.L2ToL2MessageIndexer, l1DepositStore *opsimulator.L1DepositStoreManager) *AdminServer { - adminServer := &AdminServer{log: log, port: port, networkConfig: networkConfig} + adminServer := &AdminServer{log: log, port: port, networkConfig: networkConfig, l1DepositStore: l1DepositStore} if networkConfig.InteropEnabled && indexer != nil { adminServer.l2ToL2MsgIndexer = indexer @@ -140,6 +154,7 @@ func (s *AdminServer) setupRouter() *gin.Engine { log: s.log, networkConfig: s.networkConfig, l2ToL2MsgIndexer: s.l2ToL2MsgIndexer, + l1DepositStore: s.l1DepositStore, } if err := rpcServer.RegisterName("admin", rpcMethods); err != nil { @@ -223,3 +238,39 @@ func (m *RPCMethods) GetL2ToL2MessageByMsgHash(args *common.Hash) (*JSONL2ToL2Me Message: msg.Message, }, nil } + +func (m *RPCMethods) GetL1ToL2MessageByTxnHash(args *common.Hash) (*JSONDepositTx, error) { + if m.l1DepositStore == nil { + return nil, &JSONRPCError{ + Code: -32601, + Message: "L1DepositStoreManager is not initialized.", + } + } + + if (args == nil || args == &common.Hash{}) { + return nil, &JSONRPCError{ + Code: -32602, + Message: "Valid msg hash not provided", + } + } + + storeEntry, err := m.l1DepositStore.Get(*args) + + if err != nil { + return nil, &JSONRPCError{ + Code: -32603, + Message: fmt.Sprintf("Failed to get message: %v", err), + } + } + + return &JSONDepositTx{ + SourceHash: storeEntry.SourceHash, + From: storeEntry.From, + To: storeEntry.To, + Mint: storeEntry.Mint, + Value: storeEntry.Value, + Gas: storeEntry.Gas, + IsSystemTransaction: storeEntry.IsSystemTransaction, + Data: storeEntry.Data, + }, nil +} diff --git a/admin/admin_test.go b/admin/admin_test.go index 068500176..0b10ed25b 100644 --- a/admin/admin_test.go +++ b/admin/admin_test.go @@ -23,7 +23,7 @@ func TestAdminServerBasicFunctionality(t *testing.T) { testlog := testlog.Logger(t, log.LevelInfo) ctx, cancel := context.WithCancel(context.Background()) - adminServer := NewAdminServer(testlog, 0, &networkConfig, nil) + adminServer := NewAdminServer(testlog, 0, &networkConfig, nil, nil) t.Cleanup(func() { cancel() }) require.NoError(t, adminServer.Start(ctx)) @@ -46,7 +46,7 @@ func TestGetL1AddressesRPC(t *testing.T) { testlog := testlog.Logger(t, log.LevelInfo) ctx, cancel := context.WithCancel(context.Background()) - adminServer := NewAdminServer(testlog, 0, &networkConfig, nil) + adminServer := NewAdminServer(testlog, 0, &networkConfig, nil, nil) t.Cleanup(func() { cancel() }) require.NoError(t, adminServer.Start(ctx)) diff --git a/opsimulator/deposits.go b/opsimulator/deposits.go index fd14ce3fc..2e831b995 100644 --- a/opsimulator/deposits.go +++ b/opsimulator/deposits.go @@ -36,7 +36,7 @@ type LogSubscriber interface { } // transforms Deposit event logs into DepositTx -func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, ch chan<- *types.DepositTx) (ethereum.Subscription, error) { +func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, ch chan<- *types.DepositTx, chLog chan<- *types.Log) (ethereum.Subscription, error) { logCh := make(chan types.Log) filterQuery := ethereum.FilterQuery{Addresses: []common.Address{depositContractAddr}, Topics: [][]common.Hash{{derive.DepositEventABIHash}}} logSubscription, err := logSub.SubscribeFilterLogs(ctx, filterQuery, logCh) @@ -61,6 +61,7 @@ func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContra continue } ch <- dep + chLog <- &log case err := <-logErrCh: errCh <- fmt.Errorf("log subscription error: %w", err) case <-ctx.Done(): diff --git a/opsimulator/deposits_test.go b/opsimulator/deposits_test.go index ee4258b1d..6da688417 100644 --- a/opsimulator/deposits_test.go +++ b/opsimulator/deposits_test.go @@ -62,8 +62,9 @@ func TestSubscribeDepositTx(t *testing.T) { ctx := context.Background() depositTxCh := make(chan *types.DepositTx, len(mockDepositTxs)) + depositLogCh := make(chan *types.Log, len(mockDepositTxs)) - sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), depositTxCh) + sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), depositTxCh, depositLogCh) if err != nil { require.NoError(t, err) } diff --git a/opsimulator/opsimulator.go b/opsimulator/opsimulator.go index 19c4d1bc6..c1e5f66ce 100644 --- a/opsimulator/opsimulator.go +++ b/opsimulator/opsimulator.go @@ -59,10 +59,12 @@ type OpSimulator struct { ethClient *ethclient.Client stopped atomic.Bool + + l1DepositStoreManager *L1DepositStoreManager } // OpSimulator wraps around the l2 chain. By embedding `Chain`, it also implements the same inteface -func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64) *OpSimulator { +func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64, depositStoreManager *L1DepositStoreManager) *OpSimulator { bgTasksCtx, bgTasksCancel := context.WithCancel(context.Background()) crossL2Inbox, err := bindings.NewCrossL2Inbox(predeploys.CrossL2InboxAddr, l2Chain.EthClient()) @@ -88,6 +90,8 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, l1Chain, }, }, + l1DepositStoreManager: depositStoreManager, + peers: peers, } } @@ -142,8 +146,9 @@ func (opSim *OpSimulator) startBackgroundTasks() { // Relay deposit tx from L1 to L2 opSim.bgTasks.Go(func() error { depositTxCh := make(chan *types.DepositTx) + l1DepositTxnLogCh := make(chan *types.Log) portalAddress := common.Address(opSim.Config().L2Config.L1Addresses.OptimismPortalProxy) - sub, err := SubscribeDepositTx(context.Background(), opSim.l1Chain.EthClient(), portalAddress, depositTxCh) + sub, err := SubscribeDepositTx(context.Background(), opSim.l1Chain.EthClient(), portalAddress, depositTxCh, l1DepositTxnLogCh) if err != nil { return fmt.Errorf("failed to subscribe to deposit tx: %w", err) } @@ -154,6 +159,7 @@ func (opSim *OpSimulator) startBackgroundTasks() { select { case dep := <-depositTxCh: depTx := types.NewTx(dep) + l1Log := <-l1DepositTxnLogCh opSim.log.Debug("observed deposit event on L1", "hash", depTx.Hash().String()) clnt := opSim.Chain.EthClient() @@ -162,6 +168,9 @@ func (opSim *OpSimulator) startBackgroundTasks() { } opSim.log.Info("OptimismPortal#depositTransaction", "l2TxHash", depTx.Hash().String()) + if err := opSim.l1DepositStoreManager.Set(l1Log.TxHash, dep); err != nil { + opSim.log.Error("failed to store deposit tx to chain: %w", "chain.id", chainId, "err", err) + } case <-opSim.bgTasksCtx.Done(): sub.Unsubscribe() diff --git a/opsimulator/store.go b/opsimulator/store.go new file mode 100644 index 000000000..f97e3626e --- /dev/null +++ b/opsimulator/store.go @@ -0,0 +1,63 @@ +package opsimulator + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +type L1DepositStore struct { + entryByHash map[common.Hash]*types.DepositTx + mu sync.RWMutex +} + +type L1DepositStoreManager struct { + store *L1DepositStore +} + +func NewL1DepositStore() *L1DepositStore { + return &L1DepositStore{ + entryByHash: make(map[common.Hash]*types.DepositTx), + } +} + +func NewL1DepositStoreManager() *L1DepositStoreManager { + return &L1DepositStoreManager{ + store: NewL1DepositStore(), + } +} + +func (s *L1DepositStore) Set(txnHash common.Hash, entry *types.DepositTx) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.entryByHash[txnHash] = entry + return nil +} + +func (s *L1DepositStore) Get(txnHash common.Hash) (*types.DepositTx, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + entry, exists := s.entryByHash[txnHash] + + if !exists { + return nil, fmt.Errorf("Deposit txn not found") + } + + return entry, nil +} + +func (s *L1DepositStoreManager) Get(txnHash common.Hash) (*types.DepositTx, error) { + return s.store.Get(txnHash) +} + +func (s *L1DepositStoreManager) Set(txnHash common.Hash, entry *types.DepositTx) error { + if err := s.store.Set(txnHash, entry); err != nil { + return fmt.Errorf("failed to store message: %w", err) + } + + return nil +} diff --git a/opsimulator/store_test.go b/opsimulator/store_test.go new file mode 100644 index 000000000..f56133dd0 --- /dev/null +++ b/opsimulator/store_test.go @@ -0,0 +1,28 @@ +package opsimulator + +import ( + "math/rand" + "testing" + + optestutils "github.com/ethereum-optimism/optimism/op-service/testutils" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/assert" +) + +func TestL1DepositStore_SetAndGet(t *testing.T) { + sm := NewL1DepositStoreManager() + + rng := rand.New(rand.NewSource(int64(0))) + sourceHash := common.Hash{} + depInput := optestutils.GenerateDeposit(sourceHash, rng) + depTx := types.NewTx(depInput) + txnHash := depTx.Hash() + + err := sm.store.Set(txnHash, depInput) + assert.NoError(t, err, "expect no error while store deposit txn ref") + + retrievedEntry, err := sm.store.Get(txnHash) + assert.NoError(t, err, "expected no error when getting entry from store") + assert.Equal(t, depInput, retrievedEntry, "expected retrieved entry to equal stored entry") +} diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 5cd6cdaf7..82988ca20 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -49,10 +49,12 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, networkCo l2Anvils[cfg.ChainID] = l2Anvil } + depositStoreMngr := opsimulator.NewL1DepositStoreManager() + // Sping up OpSim to fornt the L2 instances for i := range networkConfig.L2Configs { cfg := networkConfig.L2Configs[i] - l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay) + l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay, depositStoreMngr) // only increment expected port if it has been specified if nextL2Port > 0 { @@ -70,7 +72,7 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, networkCo } } - a := admin.NewAdminServer(log, adminPort, networkConfig, o.l2ToL2MsgIndexer) + a := admin.NewAdminServer(log, adminPort, networkConfig, o.l2ToL2MsgIndexer, depositStoreMngr) o.AdminServer = a diff --git a/supersim_test.go b/supersim_test.go index 841d37d7a..b50824c83 100644 --- a/supersim_test.go +++ b/supersim_test.go @@ -74,6 +74,17 @@ type JSONL2ToL2Message struct { Message hexutil.Bytes `json:"Message"` } +type JSONDepositTx struct { + SourceHash common.Hash `json:"SourceHash"` + From common.Address `json:"From"` + To *common.Address `json:"To"` + Mint *big.Int `json:"Mint"` + Value *big.Int `json:"Value"` + Gas uint64 `json:"Gas"` + IsSystemTransaction bool `json:"IsSystemTransaction"` + Data hexutil.Bytes `json:"Data"` +} + type InteropTestSuite struct { t *testing.T @@ -1089,3 +1100,70 @@ func TestAdminGetL2ToL2MessageByMsgHash(t *testing.T) { assert.Equal(t, tx.To().String(), message.Target.String()) assert.Equal(t, tx.To().String(), message.Sender.String()) } + +func TestAdminGetL1ToL2MessageByTxnHash(t *testing.T) { + t.Parallel() + + testSuite := createTestSuite(t, &config.CLIConfig{}) + + l1Chain := testSuite.Supersim.Orchestrator.L1Chain() + l1EthClient, _ := ethclient.Dial(l1Chain.Endpoint()) + + var wg sync.WaitGroup + var l1TxMutex sync.Mutex + + l2Chains := testSuite.Supersim.Orchestrator.L2Chains() + wg.Add(len(l2Chains)) + for i, chain := range l2Chains { + go func() { + defer wg.Done() + + l2EthClient, _ := ethclient.Dial(chain.Endpoint()) + privateKey, _ := testSuite.DevKeys.Secret(devkeys.UserKey(i)) + senderAddress, _ := testSuite.DevKeys.Address(devkeys.UserKey(i)) + adminRPCClient, _ := rpc.Dial(testSuite.Supersim.Orchestrator.AdminServer.Endpoint()) + + oneEth := big.NewInt(1e18) + prevBalance, _ := l2EthClient.BalanceAt(context.Background(), senderAddress, nil) + + transactor, _ := bind.NewKeyedTransactorWithChainID(privateKey, big.NewInt(int64(l1Chain.Config().ChainID))) + transactor.Value = oneEth + optimismPortal, _ := opbindings.NewOptimismPortal(common.Address(chain.Config().L2Config.L1Addresses.OptimismPortalProxy), l1EthClient) + + // needs a lock because the gas estimation can be outdated between transactions + l1TxMutex.Lock() + tx, err := optimismPortal.DepositTransaction(transactor, senderAddress, oneEth, 100000, false, make([]byte, 0)) + l1TxMutex.Unlock() + require.NoError(t, err) + + txReceipt, _ := bind.WaitMined(context.Background(), l1EthClient, tx) + require.NoError(t, err) + + require.True(t, txReceipt.Status == 1, "Deposit transaction failed") + require.NotEmpty(t, txReceipt.Logs, "Deposit transaction failed") + + postBalance, postBalanceCheckErr := wait.ForBalanceChange( + context.Background(), + l2EthClient, + senderAddress, + prevBalance, + ) + require.NoError(t, postBalanceCheckErr) + + // check that balance was increased + require.Equal(t, oneEth, postBalance.Sub(postBalance, prevBalance), "Recipient balance is incorrect") + + var message *JSONDepositTx + // msgHash for the above sendERC20 txn + l1TxnHash := txReceipt.TxHash + rpcErr := adminRPCClient.CallContext(context.Background(), &message, "admin_getL1ToL2MessageByTxnHash", l1TxnHash) + require.NoError(t, rpcErr) + + assert.Equal(t, oneEth.String(), message.Value.String()) + assert.Equal(t, oneEth.String(), message.Mint.String()) + assert.Equal(t, false, message.IsSystemTransaction) + }() + } + + wg.Wait() +} From 2b0db5e20c81ba71f787602b2b118835cf8a4558 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Tue, 26 Nov 2024 01:13:03 +0530 Subject: [PATCH 02/22] rebase fix --- opsimulator/opsimulator.go | 2 +- orchestrator/orchestrator.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opsimulator/opsimulator.go b/opsimulator/opsimulator.go index d1bac198e..ad7da54b4 100644 --- a/opsimulator/opsimulator.go +++ b/opsimulator/opsimulator.go @@ -63,7 +63,7 @@ type OpSimulator struct { } // OpSimulator wraps around the l2 chain. By embedding `Chain`, it also implements the same inteface -func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host string, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64) *OpSimulator { +func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host string, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64, depositStoreManager *L1DepositStoreManager) *OpSimulator { bgTasksCtx, bgTasksCancel := context.WithCancel(context.Background()) crossL2Inbox, err := bindings.NewCrossL2Inbox(predeploys.CrossL2InboxAddr, l2Chain.EthClient()) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 05d472f61..9cc964115 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -55,7 +55,7 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, networkCo for i := range networkConfig.L2Configs { cfg := networkConfig.L2Configs[i] - l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, cfg.Host, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay) + l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, cfg.Host, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay, depositStoreMngr) // only increment expected port if it has been specified if nextL2Port > 0 { From 7191e011d4a5499cb15220278be055c7e15e4be3 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Wed, 11 Dec 2024 17:39:41 +0530 Subject: [PATCH 03/22] added modifications --- opsimulator/indexer.go | 114 +++++++++++++++++++++++++++++++++++++ opsimulator/opsimulator.go | 30 +++++----- 2 files changed, 129 insertions(+), 15 deletions(-) create mode 100644 opsimulator/indexer.go diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go new file mode 100644 index 000000000..9487e51bd --- /dev/null +++ b/opsimulator/indexer.go @@ -0,0 +1,114 @@ +package opsimulator + +import ( + "context" + "fmt" + + "github.com/asaskevich/EventBus" + "github.com/ethereum-optimism/optimism/op-service/tasks" + "github.com/ethereum-optimism/supersim/config" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +type L1ToL2MessageIndexer struct { + log log.Logger + storeManager *L1DepositStoreManager + eb EventBus.Bus + l1Chain config.Chain + tasks tasks.Group + tasksCtx context.Context + tasksCancel context.CancelFunc +} + +func NewL1ToL2MessageIndexer(log log.Logger, l1Chain config.Chain) *L1ToL2MessageIndexer { + tasksCtx, tasksCancel := context.WithCancel(context.Background()) + + return &L1ToL2MessageIndexer{ + log: log, + storeManager: NewL1DepositStoreManager(), + eb: EventBus.New(), + l1Chain: l1Chain, + tasks: tasks.Group{ + HandleCrit: func(err error) { + fmt.Printf("unhandled indexer error: %v\n", err) + }, + }, + tasksCtx: tasksCtx, + tasksCancel: tasksCancel, + } +} + +func (i *L1ToL2MessageIndexer) Start(ctx context.Context) error { + + i.tasks.Go(func() error { + depositTxCh := make(chan *types.DepositTx) + l1DepositTxnLogCh := make(chan *types.Log) + portalAddress := common.Address(i.l1Chain.Config().L2Config.L1Addresses.OptimismPortalProxy) + sub, err := SubscribeDepositTx(i.tasksCtx, i.l1Chain.EthClient(), portalAddress, depositTxCh, l1DepositTxnLogCh) + + if err != nil { + return fmt.Errorf("failed to subscribe to deposit tx: %w", err) + } + + chainID := i.l1Chain.Config().ChainID + + for { + select { + case dep := <-depositTxCh: + + if err := i.processEvent(dep, chainID); err != nil { + fmt.Printf("failed to process log: %v\n", err) + } + + case <-i.tasksCtx.Done(): + sub.Unsubscribe() + } + } + }) + + return nil +} + +func (i *L1ToL2MessageIndexer) Stop(ctx context.Context) error { + i.tasksCancel() + return nil +} + +func depositMessageInfoKey() string { + return fmt.Sprintln("DepositMessageKey") +} + +func (i *L1ToL2MessageIndexer) SubscribeDepositMessage(depositMessageChan chan<- *types.DepositTx) (func(), error) { + return i.createSubscription(depositMessageInfoKey(), depositMessageChan) +} + +func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan chan<- *types.DepositTx) (func(), error) { + handler := func(e *types.DepositTx) { + depositMessageChan <- e + } + + if err := i.eb.Subscribe(key, handler); err != nil { + return nil, fmt.Errorf("failed to create subscription %s: %w", key, err) + } + + return func() { + _ = i.eb.Unsubscribe(key, handler) + }, nil +} + +func (i *L1ToL2MessageIndexer) processEvent(dep *types.DepositTx, chainID uint64) error { + + depTx := types.NewTx(dep) + i.log.Debug("observed deposit event on L1", "hash", depTx.Hash().String()) + + if err := i.storeManager.Set(depTx.Hash(), dep); err != nil { + i.log.Error("failed to store deposit tx to chain: %w", "chain.id", chainID, "err", err) + return err + } + + i.eb.Publish(depositMessageInfoKey(), depTx) + + return nil +} diff --git a/opsimulator/opsimulator.go b/opsimulator/opsimulator.go index ad7da54b4..f7bffd856 100644 --- a/opsimulator/opsimulator.go +++ b/opsimulator/opsimulator.go @@ -59,7 +59,7 @@ type OpSimulator struct { stopped atomic.Bool - l1DepositStoreManager *L1DepositStoreManager + indexer *L1ToL2MessageIndexer } // OpSimulator wraps around the l2 chain. By embedding `Chain`, it also implements the same inteface @@ -90,9 +90,8 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host str }, }, - l1DepositStoreManager: depositStoreManager, - - peers: peers, + peers: peers, + indexer: NewL1ToL2MessageIndexer(log, l1Chain), } } @@ -127,6 +126,10 @@ func (opSim *OpSimulator) Start(ctx context.Context) error { return fmt.Errorf("failed to create eth client: %w", err) } + if err := opSim.indexer.Start(ctx); err != nil { + return fmt.Errorf("L1ToL2Indexer failed to start: %w", err) + } + opSim.ethClient = ethClient opSim.startBackgroundTasks() return nil @@ -140,6 +143,10 @@ func (opSim *OpSimulator) Stop(ctx context.Context) error { return nil // someone else stopped } + if err := opSim.indexer.Stop(ctx); err != nil { + return errors.New("Failed to stop L1ToL2Indexer") + } + opSim.bgTasksCancel() return opSim.httpServer.Stop(ctx) } @@ -152,11 +159,10 @@ func (opSim *OpSimulator) startBackgroundTasks() { // Relay deposit tx from L1 to L2 opSim.bgTasks.Go(func() error { depositTxCh := make(chan *types.DepositTx) - l1DepositTxnLogCh := make(chan *types.Log) - portalAddress := common.Address(opSim.Config().L2Config.L1Addresses.OptimismPortalProxy) - sub, err := SubscribeDepositTx(context.Background(), opSim.l1Chain.EthClient(), portalAddress, depositTxCh, l1DepositTxnLogCh) + unsubscribe, err := opSim.indexer.SubscribeDepositMessage(depositTxCh) + if err != nil { - return fmt.Errorf("failed to subscribe to deposit tx: %w", err) + opSim.log.Error("Failed to subscribe to indexer") } chainId := opSim.Config().ChainID @@ -165,7 +171,6 @@ func (opSim *OpSimulator) startBackgroundTasks() { select { case dep := <-depositTxCh: depTx := types.NewTx(dep) - l1Log := <-l1DepositTxnLogCh opSim.log.Debug("observed deposit event on L1", "hash", depTx.Hash().String()) clnt := opSim.Chain.EthClient() @@ -173,13 +178,8 @@ func (opSim *OpSimulator) startBackgroundTasks() { opSim.log.Error("failed to submit deposit tx to chain: %w", "chain.id", chainId, "err", err) } - opSim.log.Info("OptimismPortal#depositTransaction", "l2TxHash", depTx.Hash().String()) - if err := opSim.l1DepositStoreManager.Set(l1Log.TxHash, dep); err != nil { - opSim.log.Error("failed to store deposit tx to chain: %w", "chain.id", chainId, "err", err) - } - case <-opSim.bgTasksCtx.Done(): - sub.Unsubscribe() + unsubscribe() close(depositTxCh) return nil } From 823c9f595921c6baa3883ff71ab84921fd26b0bd Mon Sep 17 00:00:00 2001 From: serverConnected Date: Wed, 11 Dec 2024 19:06:11 +0530 Subject: [PATCH 04/22] fixes --- opsimulator/deposits.go | 3 +-- opsimulator/deposits_test.go | 3 +-- opsimulator/indexer.go | 18 +++++++++--------- opsimulator/opsimulator.go | 6 +++--- orchestrator/orchestrator.go | 2 +- supersim_test.go | 5 +++-- 6 files changed, 18 insertions(+), 19 deletions(-) diff --git a/opsimulator/deposits.go b/opsimulator/deposits.go index 2e831b995..fd14ce3fc 100644 --- a/opsimulator/deposits.go +++ b/opsimulator/deposits.go @@ -36,7 +36,7 @@ type LogSubscriber interface { } // transforms Deposit event logs into DepositTx -func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, ch chan<- *types.DepositTx, chLog chan<- *types.Log) (ethereum.Subscription, error) { +func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, ch chan<- *types.DepositTx) (ethereum.Subscription, error) { logCh := make(chan types.Log) filterQuery := ethereum.FilterQuery{Addresses: []common.Address{depositContractAddr}, Topics: [][]common.Hash{{derive.DepositEventABIHash}}} logSubscription, err := logSub.SubscribeFilterLogs(ctx, filterQuery, logCh) @@ -61,7 +61,6 @@ func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContra continue } ch <- dep - chLog <- &log case err := <-logErrCh: errCh <- fmt.Errorf("log subscription error: %w", err) case <-ctx.Done(): diff --git a/opsimulator/deposits_test.go b/opsimulator/deposits_test.go index 6da688417..ee4258b1d 100644 --- a/opsimulator/deposits_test.go +++ b/opsimulator/deposits_test.go @@ -62,9 +62,8 @@ func TestSubscribeDepositTx(t *testing.T) { ctx := context.Background() depositTxCh := make(chan *types.DepositTx, len(mockDepositTxs)) - depositLogCh := make(chan *types.Log, len(mockDepositTxs)) - sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), depositTxCh, depositLogCh) + sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), depositTxCh) if err != nil { require.NoError(t, err) } diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index 9487e51bd..07e6586ed 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum-optimism/supersim/config" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" ) @@ -16,20 +17,21 @@ type L1ToL2MessageIndexer struct { log log.Logger storeManager *L1DepositStoreManager eb EventBus.Bus - l1Chain config.Chain + l2Chain config.Chain tasks tasks.Group tasksCtx context.Context tasksCancel context.CancelFunc + ethClient *ethclient.Client } -func NewL1ToL2MessageIndexer(log log.Logger, l1Chain config.Chain) *L1ToL2MessageIndexer { +func NewL1ToL2MessageIndexer(log log.Logger, l2Chain config.Chain) *L1ToL2MessageIndexer { tasksCtx, tasksCancel := context.WithCancel(context.Background()) return &L1ToL2MessageIndexer{ log: log, storeManager: NewL1DepositStoreManager(), eb: EventBus.New(), - l1Chain: l1Chain, + l2Chain: l2Chain, tasks: tasks.Group{ HandleCrit: func(err error) { fmt.Printf("unhandled indexer error: %v\n", err) @@ -40,24 +42,22 @@ func NewL1ToL2MessageIndexer(log log.Logger, l1Chain config.Chain) *L1ToL2Messag } } -func (i *L1ToL2MessageIndexer) Start(ctx context.Context) error { +func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Client) error { i.tasks.Go(func() error { depositTxCh := make(chan *types.DepositTx) - l1DepositTxnLogCh := make(chan *types.Log) - portalAddress := common.Address(i.l1Chain.Config().L2Config.L1Addresses.OptimismPortalProxy) - sub, err := SubscribeDepositTx(i.tasksCtx, i.l1Chain.EthClient(), portalAddress, depositTxCh, l1DepositTxnLogCh) + portalAddress := common.Address(i.l2Chain.Config().L2Config.L1Addresses.OptimismPortalProxy) + sub, err := SubscribeDepositTx(i.tasksCtx, i.l2Chain.EthClient(), portalAddress, depositTxCh) if err != nil { return fmt.Errorf("failed to subscribe to deposit tx: %w", err) } - chainID := i.l1Chain.Config().ChainID + chainID := i.l2Chain.Config().ChainID for { select { case dep := <-depositTxCh: - if err := i.processEvent(dep, chainID); err != nil { fmt.Printf("failed to process log: %v\n", err) } diff --git a/opsimulator/opsimulator.go b/opsimulator/opsimulator.go index f7bffd856..8fd17c617 100644 --- a/opsimulator/opsimulator.go +++ b/opsimulator/opsimulator.go @@ -63,7 +63,7 @@ type OpSimulator struct { } // OpSimulator wraps around the l2 chain. By embedding `Chain`, it also implements the same inteface -func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host string, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64, depositStoreManager *L1DepositStoreManager) *OpSimulator { +func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host string, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64) *OpSimulator { bgTasksCtx, bgTasksCancel := context.WithCancel(context.Background()) crossL2Inbox, err := bindings.NewCrossL2Inbox(predeploys.CrossL2InboxAddr, l2Chain.EthClient()) @@ -91,7 +91,7 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host str }, peers: peers, - indexer: NewL1ToL2MessageIndexer(log, l1Chain), + indexer: NewL1ToL2MessageIndexer(log, l2Chain), } } @@ -126,7 +126,7 @@ func (opSim *OpSimulator) Start(ctx context.Context) error { return fmt.Errorf("failed to create eth client: %w", err) } - if err := opSim.indexer.Start(ctx); err != nil { + if err := opSim.indexer.Start(ctx, ethClient); err != nil { return fmt.Errorf("L1ToL2Indexer failed to start: %w", err) } diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 9cc964115..05d472f61 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -55,7 +55,7 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, networkCo for i := range networkConfig.L2Configs { cfg := networkConfig.L2Configs[i] - l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, cfg.Host, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay, depositStoreMngr) + l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, cfg.Host, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay) // only increment expected port if it has been specified if nextL2Port > 0 { diff --git a/supersim_test.go b/supersim_test.go index b50824c83..9f40b8daa 100644 --- a/supersim_test.go +++ b/supersim_test.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum-optimism/optimism/op-service/predeploys" "github.com/ethereum-optimism/optimism/op-service/testlog" registry "github.com/ethereum-optimism/superchain-registry/superchain" + "github.com/ethereum-optimism/supersim/admin" "github.com/ethereum-optimism/supersim/bindings" "github.com/ethereum-optimism/supersim/config" "github.com/ethereum-optimism/supersim/interop" @@ -1089,7 +1090,7 @@ func TestAdminGetL2ToL2MessageByMsgHash(t *testing.T) { }) assert.NoError(t, waitErr) - var message *JSONL2ToL2Message + var message *admin.JSONL2ToL2Message // msgHash for the above sendERC20 txn msgHash := "0x3656fd893944321663b2877d10db2895fb68e2346fd7e3f648ce5b986c200166" rpcErr := client.CallContext(context.Background(), &message, "admin_getL2ToL2MessageByMsgHash", msgHash) @@ -1153,7 +1154,7 @@ func TestAdminGetL1ToL2MessageByTxnHash(t *testing.T) { // check that balance was increased require.Equal(t, oneEth, postBalance.Sub(postBalance, prevBalance), "Recipient balance is incorrect") - var message *JSONDepositTx + var message *admin.JSONDepositTx // msgHash for the above sendERC20 txn l1TxnHash := txReceipt.TxHash rpcErr := adminRPCClient.CallContext(context.Background(), &message, "admin_getL1ToL2MessageByTxnHash", l1TxnHash) From 1e3ff9f10f798404cda7f1e6bf435344d87bcf8a Mon Sep 17 00:00:00 2001 From: serverConnected Date: Wed, 11 Dec 2024 19:08:17 +0530 Subject: [PATCH 05/22] conflict fix --- contracts/lib/optimism | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contracts/lib/optimism b/contracts/lib/optimism index 3f43f039a..7b119c533 160000 --- a/contracts/lib/optimism +++ b/contracts/lib/optimism @@ -1 +1 @@ -Subproject commit 3f43f039a9e68b777045d7e2446947acbd9b0592 +Subproject commit 7b119c533f22bd5ef86bed2455b945987ca319a9 From 655818efda3fe7b97a6d93211284135ac0314b67 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Wed, 11 Dec 2024 20:40:34 +0530 Subject: [PATCH 06/22] fixed: indexer setup --- opsimulator/indexer.go | 11 ++++++----- opsimulator/opsimulator.go | 17 ++++++++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index 07e6586ed..09b8cbf20 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -47,7 +47,7 @@ func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Clie i.tasks.Go(func() error { depositTxCh := make(chan *types.DepositTx) portalAddress := common.Address(i.l2Chain.Config().L2Config.L1Addresses.OptimismPortalProxy) - sub, err := SubscribeDepositTx(i.tasksCtx, i.l2Chain.EthClient(), portalAddress, depositTxCh) + sub, err := SubscribeDepositTx(i.tasksCtx, client, portalAddress, depositTxCh) if err != nil { return fmt.Errorf("failed to subscribe to deposit tx: %w", err) @@ -80,12 +80,12 @@ func depositMessageInfoKey() string { return fmt.Sprintln("DepositMessageKey") } -func (i *L1ToL2MessageIndexer) SubscribeDepositMessage(depositMessageChan chan<- *types.DepositTx) (func(), error) { +func (i *L1ToL2MessageIndexer) SubscribeDepositMessage(depositMessageChan chan<- *types.Transaction) (func(), error) { return i.createSubscription(depositMessageInfoKey(), depositMessageChan) } -func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan chan<- *types.DepositTx) (func(), error) { - handler := func(e *types.DepositTx) { +func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan chan<- *types.Transaction) (func(), error) { + handler := func(e *types.Transaction) { depositMessageChan <- e } @@ -101,7 +101,8 @@ func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan func (i *L1ToL2MessageIndexer) processEvent(dep *types.DepositTx, chainID uint64) error { depTx := types.NewTx(dep) - i.log.Debug("observed deposit event on L1", "hash", depTx.Hash().String()) + i.log.Info("observed deposit event on L1", "hash", depTx.Hash().String()) + fmt.Println(depTx.Hash().String()) if err := i.storeManager.Set(depTx.Hash(), dep); err != nil { i.log.Error("failed to store deposit tx to chain: %w", "chain.id", chainID, "err", err) diff --git a/opsimulator/opsimulator.go b/opsimulator/opsimulator.go index 3706875ae..78f0ef295 100644 --- a/opsimulator/opsimulator.go +++ b/opsimulator/opsimulator.go @@ -71,10 +71,12 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host str closeApp(fmt.Errorf("failed to create cross L2 inbox: %w", err)) } + newLog := log.New("chain.id", l2Chain.Config().ChainID) + return &OpSimulator{ Chain: l2Chain, - log: log.New("chain.id", l2Chain.Config().ChainID), + log: newLog, port: port, host: host, l1Chain: l1Chain, @@ -91,7 +93,7 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host str }, peers: peers, - indexer: NewL1ToL2MessageIndexer(log, l2Chain), + indexer: NewL1ToL2MessageIndexer(newLog, l2Chain), } } @@ -126,7 +128,7 @@ func (opSim *OpSimulator) Start(ctx context.Context) error { return fmt.Errorf("failed to create eth client: %w", err) } - if err := opSim.indexer.Start(ctx, ethClient); err != nil { + if err := opSim.indexer.Start(ctx, opSim.l1Chain.EthClient()); err != nil { return fmt.Errorf("L1ToL2Indexer failed to start: %w", err) } @@ -162,7 +164,7 @@ func (opSim *OpSimulator) EthClient() *ethclient.Client { func (opSim *OpSimulator) startBackgroundTasks() { // Relay deposit tx from L1 to L2 opSim.bgTasks.Go(func() error { - depositTxCh := make(chan *types.DepositTx) + depositTxCh := make(chan *types.Transaction) unsubscribe, err := opSim.indexer.SubscribeDepositMessage(depositTxCh) if err != nil { @@ -174,14 +176,15 @@ func (opSim *OpSimulator) startBackgroundTasks() { for { select { case dep := <-depositTxCh: - depTx := types.NewTx(dep) - opSim.log.Debug("observed deposit event on L1", "hash", depTx.Hash().String()) + opSim.log.Debug("observed deposit event on L1", "hash", dep.Hash().String()) clnt := opSim.Chain.EthClient() - if err := clnt.SendTransaction(opSim.bgTasksCtx, depTx); err != nil { + if err := clnt.SendTransaction(opSim.bgTasksCtx, dep); err != nil { opSim.log.Error("failed to submit deposit tx to chain: %w", "chain.id", chainId, "err", err) } + opSim.log.Info("OptimismPortal#depositTransaction", "l2TxHash", dep.Hash().String()) + case <-opSim.bgTasksCtx.Done(): unsubscribe() close(depositTxCh) From f74c4f68b97ac5e9d8caff2dff0570c9df886a71 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Thu, 12 Dec 2024 02:27:52 +0530 Subject: [PATCH 07/22] final changes --- interop/indexer.go | 1 + opsimulator/deposits.go | 12 +++++++++--- opsimulator/indexer.go | 22 +++++++++++----------- opsimulator/opsimulator.go | 8 ++++---- orchestrator/orchestrator.go | 2 +- 5 files changed, 26 insertions(+), 19 deletions(-) diff --git a/interop/indexer.go b/interop/indexer.go index 3117c86e5..494f8378b 100644 --- a/interop/indexer.go +++ b/interop/indexer.go @@ -49,6 +49,7 @@ func (i *L2ToL2MessageIndexer) Start(ctx context.Context, clients map[uint64]*et for chainID, client := range i.clients { i.tasks.Go(func() error { + logCh := make(chan types.Log) fq := ethereum.FilterQuery{Addresses: []common.Address{predeploys.L2toL2CrossDomainMessengerAddr}} sub, err := client.SubscribeFilterLogs(i.tasksCtx, fq, logCh) diff --git a/opsimulator/deposits.go b/opsimulator/deposits.go index fd14ce3fc..3f0bf8c48 100644 --- a/opsimulator/deposits.go +++ b/opsimulator/deposits.go @@ -23,8 +23,15 @@ type depositTxSubscription struct { } func (d *depositTxSubscription) Unsubscribe() { - d.logSubscription.Unsubscribe() - d.doneCh <- struct{}{} + // since mul opsims run sub to indexer twice, a select needs to be added to avoid and race condition + select { + case <-d.doneCh: + // Already closed, do nothing + return + default: + d.logSubscription.Unsubscribe() + close(d.doneCh) // Close it here to signal termination + } } func (d *depositTxSubscription) Err() <-chan error { @@ -51,7 +58,6 @@ func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContra go func() { defer close(logCh) defer close(errCh) - defer close(doneCh) for { select { case log := <-logCh: diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index 09b8cbf20..9f4ad4774 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -24,14 +24,13 @@ type L1ToL2MessageIndexer struct { ethClient *ethclient.Client } -func NewL1ToL2MessageIndexer(log log.Logger, l2Chain config.Chain) *L1ToL2MessageIndexer { +func NewL1ToL2MessageIndexer(log log.Logger, storeManager *L1DepositStoreManager) *L1ToL2MessageIndexer { tasksCtx, tasksCancel := context.WithCancel(context.Background()) return &L1ToL2MessageIndexer{ log: log, - storeManager: NewL1DepositStoreManager(), + storeManager: storeManager, eb: EventBus.New(), - l2Chain: l2Chain, tasks: tasks.Group{ HandleCrit: func(err error) { fmt.Printf("unhandled indexer error: %v\n", err) @@ -42,11 +41,13 @@ func NewL1ToL2MessageIndexer(log log.Logger, l2Chain config.Chain) *L1ToL2Messag } } -func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Client) error { +func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Client, l2Chain config.Chain) error { + + i.l2Chain = l2Chain i.tasks.Go(func() error { depositTxCh := make(chan *types.DepositTx) - portalAddress := common.Address(i.l2Chain.Config().L2Config.L1Addresses.OptimismPortalProxy) + portalAddress := common.Address(l2Chain.Config().L2Config.L1Addresses.OptimismPortalProxy) sub, err := SubscribeDepositTx(i.tasksCtx, client, portalAddress, depositTxCh) if err != nil { @@ -76,12 +77,12 @@ func (i *L1ToL2MessageIndexer) Stop(ctx context.Context) error { return nil } -func depositMessageInfoKey() string { - return fmt.Sprintln("DepositMessageKey") +func depositMessageInfoKey(destinationChainID uint64) string { + return fmt.Sprintf("DepositMessageKey:destination:%d", destinationChainID) } -func (i *L1ToL2MessageIndexer) SubscribeDepositMessage(depositMessageChan chan<- *types.Transaction) (func(), error) { - return i.createSubscription(depositMessageInfoKey(), depositMessageChan) +func (i *L1ToL2MessageIndexer) SubscribeDepositMessage(destinationChainID uint64, depositMessageChan chan<- *types.Transaction) (func(), error) { + return i.createSubscription(depositMessageInfoKey(destinationChainID), depositMessageChan) } func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan chan<- *types.Transaction) (func(), error) { @@ -109,7 +110,6 @@ func (i *L1ToL2MessageIndexer) processEvent(dep *types.DepositTx, chainID uint64 return err } - i.eb.Publish(depositMessageInfoKey(), depTx) - + i.eb.Publish(depositMessageInfoKey(chainID), depTx) return nil } diff --git a/opsimulator/opsimulator.go b/opsimulator/opsimulator.go index 78f0ef295..c0ec2bb22 100644 --- a/opsimulator/opsimulator.go +++ b/opsimulator/opsimulator.go @@ -63,7 +63,7 @@ type OpSimulator struct { } // OpSimulator wraps around the l2 chain. By embedding `Chain`, it also implements the same inteface -func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host string, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64) *OpSimulator { +func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host string, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64, storeManager *L1DepositStoreManager) *OpSimulator { bgTasksCtx, bgTasksCancel := context.WithCancel(context.Background()) crossL2Inbox, err := bindings.NewCrossL2Inbox(predeploys.CrossL2InboxAddr, l2Chain.EthClient()) @@ -93,7 +93,7 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host str }, peers: peers, - indexer: NewL1ToL2MessageIndexer(newLog, l2Chain), + indexer: NewL1ToL2MessageIndexer(newLog, storeManager), } } @@ -128,7 +128,7 @@ func (opSim *OpSimulator) Start(ctx context.Context) error { return fmt.Errorf("failed to create eth client: %w", err) } - if err := opSim.indexer.Start(ctx, opSim.l1Chain.EthClient()); err != nil { + if err := opSim.indexer.Start(ctx, opSim.l1Chain.EthClient(), opSim.Chain); err != nil { return fmt.Errorf("L1ToL2Indexer failed to start: %w", err) } @@ -165,7 +165,7 @@ func (opSim *OpSimulator) startBackgroundTasks() { // Relay deposit tx from L1 to L2 opSim.bgTasks.Go(func() error { depositTxCh := make(chan *types.Transaction) - unsubscribe, err := opSim.indexer.SubscribeDepositMessage(depositTxCh) + unsubscribe, err := opSim.indexer.SubscribeDepositMessage(opSim.Config().ChainID, depositTxCh) if err != nil { opSim.log.Error("Failed to subscribe to indexer") diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index e2399168a..f8a250e6c 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -55,7 +55,7 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, networkCo for i := range networkConfig.L2Configs { cfg := networkConfig.L2Configs[i] - l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, cfg.Host, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay) + l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, cfg.Host, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay, depositStoreMngr) // only increment expected port if it has been specified if nextL2Port > 0 { From 11ab6499ccc3e2d4d9113a974d16bf0af26a1b37 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Thu, 12 Dec 2024 15:30:05 +0530 Subject: [PATCH 08/22] fixed test case and use source hash as identifier --- opsimulator/indexer.go | 5 ++--- supersim_test.go | 11 ++++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index 9f4ad4774..bc980f3e2 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -102,10 +102,9 @@ func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan func (i *L1ToL2MessageIndexer) processEvent(dep *types.DepositTx, chainID uint64) error { depTx := types.NewTx(dep) - i.log.Info("observed deposit event on L1", "hash", depTx.Hash().String()) - fmt.Println(depTx.Hash().String()) + i.log.Info("observed deposit event on L1", "hash", depTx.Hash().String(), "SourceHash", dep.SourceHash.String()) - if err := i.storeManager.Set(depTx.Hash(), dep); err != nil { + if err := i.storeManager.Set(dep.SourceHash, dep); err != nil { i.log.Error("failed to store deposit tx to chain: %w", "chain.id", chainID, "err", err) return err } diff --git a/supersim_test.go b/supersim_test.go index 237cc80bd..4f696526f 100644 --- a/supersim_test.go +++ b/supersim_test.go @@ -10,7 +10,9 @@ import ( "github.com/ethereum-optimism/optimism/op-chain-ops/devkeys" opbindings "github.com/ethereum-optimism/optimism/op-e2e/bindings" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/receipts" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/predeploys" "github.com/ethereum-optimism/optimism/op-service/testlog" registry "github.com/ethereum-optimism/superchain-registry/superchain" @@ -1169,10 +1171,13 @@ func TestAdminGetL1ToL2MessageByTxnHash(t *testing.T) { // check that balance was increased require.Equal(t, oneEth, postBalance.Sub(postBalance, prevBalance), "Recipient balance is incorrect") + depositEvent, err := receipts.FindLog(txReceipt.Logs, optimismPortal.ParseTransactionDeposited) + require.NoError(t, err, "Should emit deposit event") + depositTx, err := derive.UnmarshalDepositLogEvent(&depositEvent.Raw) + require.NoError(t, err) + var message *admin.JSONDepositTx - // msgHash for the above sendERC20 txn - l1TxnHash := txReceipt.TxHash - rpcErr := adminRPCClient.CallContext(context.Background(), &message, "admin_getL1ToL2MessageByTxnHash", l1TxnHash) + rpcErr := adminRPCClient.CallContext(context.Background(), &message, "admin_getL1ToL2MessageByTxnHash", depositTx.SourceHash) require.NoError(t, rpcErr) assert.Equal(t, oneEth.String(), message.Value.String()) From 8f9f2642e592022913c410e1b563def25d43f5e9 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Thu, 12 Dec 2024 15:32:57 +0530 Subject: [PATCH 09/22] removed unneeded type --- supersim_test.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/supersim_test.go b/supersim_test.go index 4f696526f..c0017730b 100644 --- a/supersim_test.go +++ b/supersim_test.go @@ -68,26 +68,6 @@ type TestSuite struct { Supersim *Supersim } -type JSONL2ToL2Message struct { - Destination uint64 `json:"Destination"` - Source uint64 `json:"Source"` - Nonce *big.Int `json:"Nonce"` - Sender common.Address `json:"Sender"` - Target common.Address `json:"Target"` - Message hexutil.Bytes `json:"Message"` -} - -type JSONDepositTx struct { - SourceHash common.Hash `json:"SourceHash"` - From common.Address `json:"From"` - To *common.Address `json:"To"` - Mint *big.Int `json:"Mint"` - Value *big.Int `json:"Value"` - Gas uint64 `json:"Gas"` - IsSystemTransaction bool `json:"IsSystemTransaction"` - Data hexutil.Bytes `json:"Data"` -} - type InteropTestSuite struct { t *testing.T From ef265c24a90ba5c66508e008762ec3086ffd0314 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Thu, 12 Dec 2024 15:35:13 +0530 Subject: [PATCH 10/22] fixed comment --- opsimulator/deposits.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/opsimulator/deposits.go b/opsimulator/deposits.go index 3f0bf8c48..687593ce1 100644 --- a/opsimulator/deposits.go +++ b/opsimulator/deposits.go @@ -23,14 +23,13 @@ type depositTxSubscription struct { } func (d *depositTxSubscription) Unsubscribe() { - // since mul opsims run sub to indexer twice, a select needs to be added to avoid and race condition + // since multiple opsims run subcription to indexer multiple times, a select needs to be added to avoid any race condition select { case <-d.doneCh: - // Already closed, do nothing return default: d.logSubscription.Unsubscribe() - close(d.doneCh) // Close it here to signal termination + close(d.doneCh) } } From 235ab768993616ab6e9e320c98c872cda71b06d8 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Thu, 12 Dec 2024 17:45:11 +0530 Subject: [PATCH 11/22] created a channel wrapper, to pass txnData and log --- opsimulator/deposits.go | 11 +++++++++-- opsimulator/deposits_test.go | 8 +++++++- opsimulator/indexer.go | 9 ++++++++- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/opsimulator/deposits.go b/opsimulator/deposits.go index 687593ce1..e6713e446 100644 --- a/opsimulator/deposits.go +++ b/opsimulator/deposits.go @@ -22,6 +22,11 @@ type depositTxSubscription struct { doneCh chan struct{} } +type DepositChannels struct { + DepositTxCh chan<- *types.DepositTx + LogCh chan<- types.Log +} + func (d *depositTxSubscription) Unsubscribe() { // since multiple opsims run subcription to indexer multiple times, a select needs to be added to avoid any race condition select { @@ -42,7 +47,7 @@ type LogSubscriber interface { } // transforms Deposit event logs into DepositTx -func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, ch chan<- *types.DepositTx) (ethereum.Subscription, error) { +func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, channels DepositChannels) (ethereum.Subscription, error) { logCh := make(chan types.Log) filterQuery := ethereum.FilterQuery{Addresses: []common.Address{depositContractAddr}, Topics: [][]common.Hash{{derive.DepositEventABIHash}}} logSubscription, err := logSub.SubscribeFilterLogs(ctx, filterQuery, logCh) @@ -65,7 +70,9 @@ func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContra errCh <- err continue } - ch <- dep + + channels.DepositTxCh <- dep + channels.LogCh <- log case err := <-logErrCh: errCh <- fmt.Errorf("log subscription error: %w", err) case <-ctx.Done(): diff --git a/opsimulator/deposits_test.go b/opsimulator/deposits_test.go index ee4258b1d..662ca0194 100644 --- a/opsimulator/deposits_test.go +++ b/opsimulator/deposits_test.go @@ -62,8 +62,14 @@ func TestSubscribeDepositTx(t *testing.T) { ctx := context.Background() depositTxCh := make(chan *types.DepositTx, len(mockDepositTxs)) + logCh := make(chan types.Log, len(mockDepositTxs)) - sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), depositTxCh) + channels := DepositChannels{ + DepositTxCh: depositTxCh, + LogCh: logCh, + } + + sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), channels) if err != nil { require.NoError(t, err) } diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index bc980f3e2..d081be8e6 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -47,8 +47,15 @@ func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Clie i.tasks.Go(func() error { depositTxCh := make(chan *types.DepositTx) + logCh := make(chan types.Log) + + channels := DepositChannels{ + DepositTxCh: depositTxCh, + LogCh: logCh, + } + portalAddress := common.Address(l2Chain.Config().L2Config.L1Addresses.OptimismPortalProxy) - sub, err := SubscribeDepositTx(i.tasksCtx, client, portalAddress, depositTxCh) + sub, err := SubscribeDepositTx(i.tasksCtx, client, portalAddress, channels) if err != nil { return fmt.Errorf("failed to subscribe to deposit tx: %w", err) From c9c372e5ecb0fc4351286ec4a15e370df027f84d Mon Sep 17 00:00:00 2001 From: serverConnected Date: Thu, 12 Dec 2024 18:35:32 +0530 Subject: [PATCH 12/22] created deposit message to include more info while storing --- admin/admin.go | 54 +++++++++++++++++++++++++++++++-------- opsimulator/indexer.go | 15 ++++++++--- opsimulator/store.go | 17 +++++++----- opsimulator/store_test.go | 14 +++++++--- supersim_test.go | 8 +++--- 5 files changed, 82 insertions(+), 26 deletions(-) diff --git a/admin/admin.go b/admin/admin.go index 977a3cd3f..740ff46fc 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -64,6 +64,23 @@ type JSONDepositTx struct { Data hexutil.Bytes `json:"Data"` } +type JSONDepositLog struct { + Address common.Address `json:"Address"` + Topics []common.Hash `json:"Topics"` + Data []byte `json:"Data"` + BlockNumber uint64 `json:"BlockNumber"` + TxHash common.Hash `json:"TxHash"` + TxIndex uint `json:"TxIndex"` + BlockHash common.Hash `json:"BlockHash"` + Index uint `json:"Index"` + Removed bool `json:"Removed"` +} + +type JSONDepositMessage struct { + DepositTxn JSONDepositTx + DepositLog JSONDepositLog +} + func (e *JSONRPCError) Error() string { return e.Message } @@ -239,7 +256,7 @@ func (m *RPCMethods) GetL2ToL2MessageByMsgHash(args *common.Hash) (*JSONL2ToL2Me }, nil } -func (m *RPCMethods) GetL1ToL2MessageByTxnHash(args *common.Hash) (*JSONDepositTx, error) { +func (m *RPCMethods) GetL1ToL2MessageByTxnHash(args *common.Hash) (*JSONDepositMessage, error) { if m.l1DepositStore == nil { return nil, &JSONRPCError{ Code: -32601, @@ -263,14 +280,31 @@ func (m *RPCMethods) GetL1ToL2MessageByTxnHash(args *common.Hash) (*JSONDepositT } } - return &JSONDepositTx{ - SourceHash: storeEntry.SourceHash, - From: storeEntry.From, - To: storeEntry.To, - Mint: storeEntry.Mint, - Value: storeEntry.Value, - Gas: storeEntry.Gas, - IsSystemTransaction: storeEntry.IsSystemTransaction, - Data: storeEntry.Data, + depositTxn := JSONDepositTx{ + SourceHash: storeEntry.DepositTxn.SourceHash, + From: storeEntry.DepositTxn.From, + To: storeEntry.DepositTxn.To, + Mint: storeEntry.DepositTxn.Mint, + Value: storeEntry.DepositTxn.Value, + Gas: storeEntry.DepositTxn.Gas, + IsSystemTransaction: storeEntry.DepositTxn.IsSystemTransaction, + Data: storeEntry.DepositTxn.Data, + } + + depositLog := JSONDepositLog{ + Address: storeEntry.DepositLog.Address, + Topics: storeEntry.DepositLog.Topics, + Data: storeEntry.DepositLog.Data, + BlockNumber: storeEntry.DepositLog.BlockNumber, + TxHash: storeEntry.DepositLog.TxHash, + TxIndex: storeEntry.DepositLog.TxIndex, + BlockHash: storeEntry.DepositLog.BlockHash, + Index: storeEntry.DepositLog.Index, + Removed: storeEntry.DepositLog.Removed, + } + + return &JSONDepositMessage{ + DepositTxn: depositTxn, + DepositLog: depositLog, }, nil } diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index d081be8e6..0993182d7 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -49,6 +49,9 @@ func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Clie depositTxCh := make(chan *types.DepositTx) logCh := make(chan types.Log) + defer close(depositTxCh) + defer close(logCh) + channels := DepositChannels{ DepositTxCh: depositTxCh, LogCh: logCh, @@ -66,7 +69,8 @@ func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Clie for { select { case dep := <-depositTxCh: - if err := i.processEvent(dep, chainID); err != nil { + log := <-logCh + if err := i.processEvent(dep, log, chainID); err != nil { fmt.Printf("failed to process log: %v\n", err) } @@ -106,12 +110,17 @@ func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan }, nil } -func (i *L1ToL2MessageIndexer) processEvent(dep *types.DepositTx, chainID uint64) error { +func (i *L1ToL2MessageIndexer) processEvent(dep *types.DepositTx, log types.Log, chainID uint64) error { depTx := types.NewTx(dep) i.log.Info("observed deposit event on L1", "hash", depTx.Hash().String(), "SourceHash", dep.SourceHash.String()) - if err := i.storeManager.Set(dep.SourceHash, dep); err != nil { + depositMessage := L1DepositMessage{ + DepositTxn: dep, + DepositLog: log, + } + + if err := i.storeManager.Set(dep.SourceHash, &depositMessage); err != nil { i.log.Error("failed to store deposit tx to chain: %w", "chain.id", chainID, "err", err) return err } diff --git a/opsimulator/store.go b/opsimulator/store.go index f97e3626e..4007a883d 100644 --- a/opsimulator/store.go +++ b/opsimulator/store.go @@ -8,8 +8,13 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +type L1DepositMessage struct { + DepositTxn *types.DepositTx + DepositLog types.Log +} + type L1DepositStore struct { - entryByHash map[common.Hash]*types.DepositTx + entryByHash map[common.Hash]*L1DepositMessage mu sync.RWMutex } @@ -19,7 +24,7 @@ type L1DepositStoreManager struct { func NewL1DepositStore() *L1DepositStore { return &L1DepositStore{ - entryByHash: make(map[common.Hash]*types.DepositTx), + entryByHash: make(map[common.Hash]*L1DepositMessage), } } @@ -29,7 +34,7 @@ func NewL1DepositStoreManager() *L1DepositStoreManager { } } -func (s *L1DepositStore) Set(txnHash common.Hash, entry *types.DepositTx) error { +func (s *L1DepositStore) Set(txnHash common.Hash, entry *L1DepositMessage) error { s.mu.Lock() defer s.mu.Unlock() @@ -37,7 +42,7 @@ func (s *L1DepositStore) Set(txnHash common.Hash, entry *types.DepositTx) error return nil } -func (s *L1DepositStore) Get(txnHash common.Hash) (*types.DepositTx, error) { +func (s *L1DepositStore) Get(txnHash common.Hash) (*L1DepositMessage, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -50,11 +55,11 @@ func (s *L1DepositStore) Get(txnHash common.Hash) (*types.DepositTx, error) { return entry, nil } -func (s *L1DepositStoreManager) Get(txnHash common.Hash) (*types.DepositTx, error) { +func (s *L1DepositStoreManager) Get(txnHash common.Hash) (*L1DepositMessage, error) { return s.store.Get(txnHash) } -func (s *L1DepositStoreManager) Set(txnHash common.Hash, entry *types.DepositTx) error { +func (s *L1DepositStoreManager) Set(txnHash common.Hash, entry *L1DepositMessage) error { if err := s.store.Set(txnHash, entry); err != nil { return fmt.Errorf("failed to store message: %w", err) } diff --git a/opsimulator/store_test.go b/opsimulator/store_test.go index f56133dd0..e3e516920 100644 --- a/opsimulator/store_test.go +++ b/opsimulator/store_test.go @@ -4,6 +4,7 @@ import ( "math/rand" "testing" + "github.com/ethereum-optimism/optimism/op-service/testutils" optestutils "github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -16,13 +17,20 @@ func TestL1DepositStore_SetAndGet(t *testing.T) { rng := rand.New(rand.NewSource(int64(0))) sourceHash := common.Hash{} depInput := optestutils.GenerateDeposit(sourceHash, rng) + depLog := optestutils.GenerateLog(testutils.RandomAddress(rng), nil, nil) depTx := types.NewTx(depInput) - txnHash := depTx.Hash() + txnHash := depTx.SourceHash() - err := sm.store.Set(txnHash, depInput) + depositMessage := L1DepositMessage{ + DepositTxn: depInput, + DepositLog: *depLog, + } + + err := sm.store.Set(txnHash, &depositMessage) assert.NoError(t, err, "expect no error while store deposit txn ref") retrievedEntry, err := sm.store.Get(txnHash) assert.NoError(t, err, "expected no error when getting entry from store") - assert.Equal(t, depInput, retrievedEntry, "expected retrieved entry to equal stored entry") + assert.Equal(t, depositMessage.DepositTxn, retrievedEntry.DepositTxn, "expected retrieved depositTxn to equal stored depositTxn") + assert.Equal(t, depositMessage.DepositLog, retrievedEntry.DepositLog, "expected retrieved depositLog to equal stored depositLog") } diff --git a/supersim_test.go b/supersim_test.go index c0017730b..eb430f309 100644 --- a/supersim_test.go +++ b/supersim_test.go @@ -1156,13 +1156,13 @@ func TestAdminGetL1ToL2MessageByTxnHash(t *testing.T) { depositTx, err := derive.UnmarshalDepositLogEvent(&depositEvent.Raw) require.NoError(t, err) - var message *admin.JSONDepositTx + var message *admin.JSONDepositMessage rpcErr := adminRPCClient.CallContext(context.Background(), &message, "admin_getL1ToL2MessageByTxnHash", depositTx.SourceHash) require.NoError(t, rpcErr) - assert.Equal(t, oneEth.String(), message.Value.String()) - assert.Equal(t, oneEth.String(), message.Mint.String()) - assert.Equal(t, false, message.IsSystemTransaction) + assert.Equal(t, oneEth.String(), message.DepositTxn.Value.String()) + assert.Equal(t, oneEth.String(), message.DepositTxn.Mint.String()) + assert.Equal(t, false, message.DepositTxn.IsSystemTransaction) }() } From 8befd29ed61cda3dd78bf08b35416c69d8be1d2e Mon Sep 17 00:00:00 2001 From: serverConnected Date: Thu, 12 Dec 2024 19:04:39 +0530 Subject: [PATCH 13/22] small comment update --- opsimulator/deposits.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opsimulator/deposits.go b/opsimulator/deposits.go index e6713e446..72b2dd3e1 100644 --- a/opsimulator/deposits.go +++ b/opsimulator/deposits.go @@ -28,7 +28,7 @@ type DepositChannels struct { } func (d *depositTxSubscription) Unsubscribe() { - // since multiple opsims run subcription to indexer multiple times, a select needs to be added to avoid any race condition + // since multiple opsims run subcription to indexer multiple times, a select needs to be added to avoid any race condition leading to a panic select { case <-d.doneCh: return From 9d8cf8a5558d3374915f4d70e8da869929883b05 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Fri, 13 Dec 2024 15:08:56 +0530 Subject: [PATCH 14/22] type update --- admin/admin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/admin/admin.go b/admin/admin.go index 740ff46fc..51a3a357c 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -67,7 +67,7 @@ type JSONDepositTx struct { type JSONDepositLog struct { Address common.Address `json:"Address"` Topics []common.Hash `json:"Topics"` - Data []byte `json:"Data"` + Data hexutil.Bytes `json:"Data"` BlockNumber uint64 `json:"BlockNumber"` TxHash common.Hash `json:"TxHash"` TxIndex uint `json:"TxIndex"` From 10df06a095cbef7c624634d6fa99d4fad9e8bd9f Mon Sep 17 00:00:00 2001 From: serverConnected Date: Fri, 13 Dec 2024 15:13:34 +0530 Subject: [PATCH 15/22] variable name update --- opsimulator/opsimulator.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opsimulator/opsimulator.go b/opsimulator/opsimulator.go index c0ec2bb22..03cc6e066 100644 --- a/opsimulator/opsimulator.go +++ b/opsimulator/opsimulator.go @@ -175,15 +175,15 @@ func (opSim *OpSimulator) startBackgroundTasks() { for { select { - case dep := <-depositTxCh: - opSim.log.Debug("observed deposit event on L1", "hash", dep.Hash().String()) + case depTx := <-depositTxCh: + opSim.log.Debug("observed deposit event on L1", "hash", depTx.Hash().String()) clnt := opSim.Chain.EthClient() - if err := clnt.SendTransaction(opSim.bgTasksCtx, dep); err != nil { + if err := clnt.SendTransaction(opSim.bgTasksCtx, depTx); err != nil { opSim.log.Error("failed to submit deposit tx to chain: %w", "chain.id", chainId, "err", err) } - opSim.log.Info("OptimismPortal#depositTransaction", "l2TxHash", dep.Hash().String()) + opSim.log.Info("OptimismPortal#depositTransaction", "l2TxHash", depTx.Hash().String()) case <-opSim.bgTasksCtx.Done(): unsubscribe() From b30e1dcb1817490974c4be5d5510b7ebe32f1a87 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Thu, 2 Jan 2025 19:32:44 +0530 Subject: [PATCH 16/22] re arranged indexer init logic --- opsimulator/deposits.go | 99 ----------------- opsimulator/indexer.go | 105 +++++++++++++++++- .../{deposits_test.go => indexer_test.go} | 51 +++++++++ opsimulator/opsimulator.go | 21 +--- orchestrator/orchestrator.go | 39 ++++--- 5 files changed, 185 insertions(+), 130 deletions(-) delete mode 100644 opsimulator/deposits.go rename opsimulator/{deposits_test.go => indexer_test.go} (60%) diff --git a/opsimulator/deposits.go b/opsimulator/deposits.go deleted file mode 100644 index 72b2dd3e1..000000000 --- a/opsimulator/deposits.go +++ /dev/null @@ -1,99 +0,0 @@ -package opsimulator - -import ( - "context" - "errors" - "fmt" - - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - - "github.com/ethereum/go-ethereum" -) - -var _ ethereum.Subscription = &depositTxSubscription{} - -type depositTxSubscription struct { - logSubscription ethereum.Subscription - logCh chan types.Log - errCh chan error - doneCh chan struct{} -} - -type DepositChannels struct { - DepositTxCh chan<- *types.DepositTx - LogCh chan<- types.Log -} - -func (d *depositTxSubscription) Unsubscribe() { - // since multiple opsims run subcription to indexer multiple times, a select needs to be added to avoid any race condition leading to a panic - select { - case <-d.doneCh: - return - default: - d.logSubscription.Unsubscribe() - close(d.doneCh) - } -} - -func (d *depositTxSubscription) Err() <-chan error { - return d.errCh -} - -type LogSubscriber interface { - SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) -} - -// transforms Deposit event logs into DepositTx -func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, channels DepositChannels) (ethereum.Subscription, error) { - logCh := make(chan types.Log) - filterQuery := ethereum.FilterQuery{Addresses: []common.Address{depositContractAddr}, Topics: [][]common.Hash{{derive.DepositEventABIHash}}} - logSubscription, err := logSub.SubscribeFilterLogs(ctx, filterQuery, logCh) - if err != nil { - return nil, fmt.Errorf("failed to create log subscription: %w", err) - } - - errCh := make(chan error) - doneCh := make(chan struct{}) - logErrCh := logSubscription.Err() - - go func() { - defer close(logCh) - defer close(errCh) - for { - select { - case log := <-logCh: - dep, err := logToDepositTx(&log) - if err != nil { - errCh <- err - continue - } - - channels.DepositTxCh <- dep - channels.LogCh <- log - case err := <-logErrCh: - errCh <- fmt.Errorf("log subscription error: %w", err) - case <-ctx.Done(): - return - case <-doneCh: - return - } - } - }() - - return &depositTxSubscription{logSubscription, logCh, errCh, doneCh}, nil -} - -func logToDepositTx(log *types.Log) (*types.DepositTx, error) { - if len(log.Topics) > 0 && log.Topics[0] == derive.DepositEventABIHash { - dep, err := derive.UnmarshalDepositLogEvent(log) - if err != nil { - return nil, err - } - return dep, nil - } else { - return nil, errors.New("log is not a deposit event") - } -} diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index 0993182d7..fa1b1b8b3 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -2,17 +2,22 @@ package opsimulator import ( "context" + "errors" "fmt" "github.com/asaskevich/EventBus" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/tasks" "github.com/ethereum-optimism/supersim/config" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" ) +var _ ethereum.Subscription = &depositTxSubscription{} + type L1ToL2MessageIndexer struct { log log.Logger storeManager *L1DepositStoreManager @@ -22,6 +27,38 @@ type L1ToL2MessageIndexer struct { tasksCtx context.Context tasksCancel context.CancelFunc ethClient *ethclient.Client + chains map[uint64]config.Chain +} + +type depositTxSubscription struct { + logSubscription ethereum.Subscription + logCh chan types.Log + errCh chan error + doneCh chan struct{} +} + +type DepositChannels struct { + DepositTxCh chan<- *types.DepositTx + LogCh chan<- types.Log +} + +func (d *depositTxSubscription) Unsubscribe() { + // since multiple opsims run subcription to indexer multiple times, a select needs to be added to avoid any race condition leading to a panic + select { + case <-d.doneCh: + return + default: + d.logSubscription.Unsubscribe() + close(d.doneCh) + } +} + +func (d *depositTxSubscription) Err() <-chan error { + return d.errCh +} + +type LogSubscriber interface { + SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) } func NewL1ToL2MessageIndexer(log log.Logger, storeManager *L1DepositStoreManager) *L1ToL2MessageIndexer { @@ -41,10 +78,20 @@ func NewL1ToL2MessageIndexer(log log.Logger, storeManager *L1DepositStoreManager } } -func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Client, l2Chain config.Chain) error { +func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Client, l2Chains map[uint64]config.Chain) error { - i.l2Chain = l2Chain + i.chains = l2Chains + + for _, chain := range i.chains { + if err := i.startForChain(ctx, client, chain); err != nil { + return fmt.Errorf("Failed to start L1 to L2 indexer") + } + } + + return nil +} +func (i *L1ToL2MessageIndexer) startForChain(ctx context.Context, client *ethclient.Client, chain config.Chain) error { i.tasks.Go(func() error { depositTxCh := make(chan *types.DepositTx) logCh := make(chan types.Log) @@ -57,7 +104,7 @@ func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Clie LogCh: logCh, } - portalAddress := common.Address(l2Chain.Config().L2Config.L1Addresses.OptimismPortalProxy) + portalAddress := common.Address(chain.Config().L2Config.L1Addresses.OptimismPortalProxy) sub, err := SubscribeDepositTx(i.tasksCtx, client, portalAddress, channels) if err != nil { @@ -128,3 +175,55 @@ func (i *L1ToL2MessageIndexer) processEvent(dep *types.DepositTx, log types.Log, i.eb.Publish(depositMessageInfoKey(chainID), depTx) return nil } + +// transforms Deposit event logs into DepositTx +func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, channels DepositChannels) (ethereum.Subscription, error) { + logCh := make(chan types.Log) + filterQuery := ethereum.FilterQuery{Addresses: []common.Address{depositContractAddr}, Topics: [][]common.Hash{{derive.DepositEventABIHash}}} + logSubscription, err := logSub.SubscribeFilterLogs(ctx, filterQuery, logCh) + if err != nil { + return nil, fmt.Errorf("failed to create log subscription: %w", err) + } + + errCh := make(chan error) + doneCh := make(chan struct{}) + logErrCh := logSubscription.Err() + + go func() { + defer close(logCh) + defer close(errCh) + for { + select { + case log := <-logCh: + dep, err := logToDepositTx(&log) + if err != nil { + errCh <- err + continue + } + + channels.DepositTxCh <- dep + channels.LogCh <- log + case err := <-logErrCh: + errCh <- fmt.Errorf("log subscription error: %w", err) + case <-ctx.Done(): + return + case <-doneCh: + return + } + } + }() + + return &depositTxSubscription{logSubscription, logCh, errCh, doneCh}, nil +} + +func logToDepositTx(log *types.Log) (*types.DepositTx, error) { + if len(log.Topics) > 0 && log.Topics[0] == derive.DepositEventABIHash { + dep, err := derive.UnmarshalDepositLogEvent(log) + if err != nil { + return nil, err + } + return dep, nil + } else { + return nil, errors.New("log is not a deposit event") + } +} diff --git a/opsimulator/deposits_test.go b/opsimulator/indexer_test.go similarity index 60% rename from opsimulator/deposits_test.go rename to opsimulator/indexer_test.go index 662ca0194..b40e19f16 100644 --- a/opsimulator/deposits_test.go +++ b/opsimulator/indexer_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + oplog "github.com/ethereum-optimism/optimism/op-service/log" optestutils "github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum-optimism/supersim/config" @@ -92,3 +93,53 @@ func TestSubscribeDepositTx(t *testing.T) { close(depositTxCh) } + +func TestSubscribePublishTx(t *testing.T) { + + depositStoreMngr := NewL1DepositStoreManager() + indexer := NewL1ToL2MessageIndexer(oplog.NewLogger(oplog.AppOut(nil), oplog.DefaultCLIConfig()), depositStoreMngr) + + mockDepositTxs := createMockDepositTxs() + chain := MockChainWithSubscriptions{testutils.NewMockChain(), mockDepositTxs} + + ctx := context.Background() + + depositTxCh := make(chan *types.DepositTx, len(mockDepositTxs)) + logCh := make(chan types.Log, len(mockDepositTxs)) + + channels := DepositChannels{ + DepositTxCh: depositTxCh, + LogCh: logCh, + } + + sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), channels) + if err != nil { + require.NoError(t, err) + } + + depositPubTxCh := make(chan *types.Transaction, len(mockDepositTxs)) + + unsubscribe, err := indexer.SubscribeDepositMessage(chain.Config().ChainID, depositPubTxCh) + + require.NoError(t, err, "Should subscribe via chainId") + + // for i := 0; i < len(mockDepositTxs); i++ { + dep := <-depositPubTxCh + + t.Logf("Transaction: %+v\n", dep) + + // //Source hash is lost in the marshal process + // // require.Equal(t, dep.From, mockDepositTxs[i].From) + // // require.Equal(t, dep.To, mockDepositTxs[i].To) + // // require.Equal(t, dep.Mint, mockDepositTxs[i].Mint) + // // require.Equal(t, dep.Value, mockDepositTxs[i].Value) + // // require.Equal(t, dep.Gas, mockDepositTxs[i].Gas) + // // require.Equal(t, dep.IsSystemTransaction, mockDepositTxs[i].IsSystemTransaction) + // // require.Equal(t, dep.Data, mockDepositTxs[i].Data) + + // } + + unsubscribe() + sub.Unsubscribe() + close(depositTxCh) +} diff --git a/opsimulator/opsimulator.go b/opsimulator/opsimulator.go index 03cc6e066..e0a77e4cc 100644 --- a/opsimulator/opsimulator.go +++ b/opsimulator/opsimulator.go @@ -63,7 +63,7 @@ type OpSimulator struct { } // OpSimulator wraps around the l2 chain. By embedding `Chain`, it also implements the same inteface -func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host string, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64, storeManager *L1DepositStoreManager) *OpSimulator { +func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host string, l1Chain, l2Chain config.Chain, peers map[uint64]config.Chain, interopDelay uint64) *OpSimulator { bgTasksCtx, bgTasksCancel := context.WithCancel(context.Background()) crossL2Inbox, err := bindings.NewCrossL2Inbox(predeploys.CrossL2InboxAddr, l2Chain.EthClient()) @@ -71,12 +71,10 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host str closeApp(fmt.Errorf("failed to create cross L2 inbox: %w", err)) } - newLog := log.New("chain.id", l2Chain.Config().ChainID) - return &OpSimulator{ Chain: l2Chain, - log: newLog, + log: log.New("chain.id", l2Chain.Config().ChainID), port: port, host: host, l1Chain: l1Chain, @@ -92,12 +90,11 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host str }, }, - peers: peers, - indexer: NewL1ToL2MessageIndexer(newLog, storeManager), + peers: peers, } } -func (opSim *OpSimulator) Start(ctx context.Context) error { +func (opSim *OpSimulator) Start(ctx context.Context, indexer *L1ToL2MessageIndexer) error { mux := http.NewServeMux() mux.Handle("/", corsHandler(opSim.handler(ctx))) @@ -110,6 +107,8 @@ func (opSim *OpSimulator) Start(ctx context.Context) error { opSim.log.Debug("started opsimulator", "name", cfg.Name, "chain.id", cfg.ChainID, "addr", hs.Addr()) opSim.httpServer = hs + opSim.indexer = indexer + if opSim.port == 0 { _, portStr, err := net.SplitHostPort(hs.Addr().String()) if err != nil { @@ -128,10 +127,6 @@ func (opSim *OpSimulator) Start(ctx context.Context) error { return fmt.Errorf("failed to create eth client: %w", err) } - if err := opSim.indexer.Start(ctx, opSim.l1Chain.EthClient(), opSim.Chain); err != nil { - return fmt.Errorf("L1ToL2Indexer failed to start: %w", err) - } - opSim.ethClient = ethClient opSim.startBackgroundTasks() return nil @@ -145,10 +140,6 @@ func (opSim *OpSimulator) Stop(ctx context.Context) error { return nil // someone else stopped } - if err := opSim.indexer.Stop(ctx); err != nil { - return errors.New("Failed to stop L1ToL2Indexer") - } - opSim.bgTasksCancel() if opSim.httpServer != nil { return opSim.httpServer.Stop(ctx) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 5d1f09d6c..854c8d4dd 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -30,6 +30,7 @@ type Orchestrator struct { l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer l2ToL2MsgRelayer *interop.L2ToL2MessageRelayer + l1ToL2MsgIndexer *opsimulator.L1ToL2MessageIndexer AdminServer *admin.AdminServer } @@ -58,8 +59,6 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig l2Anvils[cfg.ChainID] = l2Anvil } - depositStoreMngr := opsimulator.NewL1DepositStoreManager() - // Sping up OpSim to fornt the L2 instances for i := range networkConfig.L2Configs { cfg := networkConfig.L2Configs[i] @@ -74,6 +73,9 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig o := Orchestrator{log: log, config: networkConfig, l1Chain: l1Anvil, l2Chains: l2Anvils, l2OpSims: l2OpSims} + depositStoreMngr := opsimulator.NewL1DepositStoreManager() + o.l1ToL2MsgIndexer := opsimulator.NewL1ToL2MessageIndexer(log, depositStoreMngr) + // Interop Setup if networkConfig.InteropEnabled { o.l2ToL2MsgIndexer = interop.NewL2ToL2MessageIndexer(log) @@ -92,6 +94,23 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig func (o *Orchestrator) Start(ctx context.Context) error { o.log.Debug("starting orchestrator") + // TODO: hack until opsim proxy supports websocket connections. + // We need websocket connections to make subscriptions. + // We should try to use make RPC through opsim not directly to the underlying chain + l2ChainClientByChainId := make(map[uint64]*ethclient.Client) + l2OpSimClientByChainId := make(map[uint64]*ethclient.Client) + l2ChainByChainId := make(map[uint64]config.Chain) + + for chainID, opSim := range o.l2OpSims { + l2ChainClientByChainId[chainID] = opSim.Chain.EthClient() + l2OpSimClientByChainId[chainID] = opSim.EthClient() + l2ChainByChainId[chainID] = opSim.Chain + } + + if err := o.l1ToL2MsgIndexer.Start(ctx, o.l1Chain.EthClient(), l2ChainByChainId); err != nil { + return fmt.Errorf("l1 to l2 message indexer failed to start: %w", err) + } + // Start Chains if err := o.l1Chain.Start(ctx); err != nil { return fmt.Errorf("l1 chain %s failed to start: %w", o.l1Chain.Config().Name, err) @@ -102,7 +121,7 @@ func (o *Orchestrator) Start(ctx context.Context) error { } } for _, opSim := range o.l2OpSims { - if err := opSim.Start(ctx); err != nil { + if err := opSim.Start(ctx, o.l1ToL2MsgIndexer); err != nil { return fmt.Errorf("op simulator instance %s failed to start: %w", opSim.Config().Name, err) } } @@ -112,16 +131,6 @@ func (o *Orchestrator) Start(ctx context.Context) error { return fmt.Errorf("unable to start mining: %w", err) } - // TODO: hack until opsim proxy supports websocket connections. - // We need websocket connections to make subscriptions. - // We should try to use make RPC through opsim not directly to the underlying chain - l2ChainClientByChainId := make(map[uint64]*ethclient.Client) - l2OpSimClientByChainId := make(map[uint64]*ethclient.Client) - for chainID, opSim := range o.l2OpSims { - l2ChainClientByChainId[chainID] = opSim.Chain.EthClient() - l2OpSimClientByChainId[chainID] = opSim.EthClient() - } - // Configure Interop (if applicable) if o.config.InteropEnabled { o.log.Info("configuring interop contracts") @@ -181,6 +190,10 @@ func (o *Orchestrator) Stop(ctx context.Context) error { } } + if err := o.indexer.Stop(ctx); err != nil { + errs = append(errs, fmt.Errorf("l1 to l2 message indexer failed to stop: %w", err)) + } + for _, opSim := range o.l2OpSims { o.log.Debug("stopping op simulator", "chain.id", opSim.Config().ChainID) if err := opSim.Stop(ctx); err != nil { From 503f6171a240eb177a942dcfcd1f8bcbc4083913 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Thu, 2 Jan 2025 22:32:06 +0530 Subject: [PATCH 17/22] inital test case - done --- opsimulator/indexer.go | 4 ++-- opsimulator/indexer_test.go | 43 ++++++++++++++++++++++--------------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index fa1b1b8b3..19cac26fb 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -117,7 +117,7 @@ func (i *L1ToL2MessageIndexer) startForChain(ctx context.Context, client *ethcli select { case dep := <-depositTxCh: log := <-logCh - if err := i.processEvent(dep, log, chainID); err != nil { + if err := i.ProcessEvent(dep, log, chainID); err != nil { fmt.Printf("failed to process log: %v\n", err) } @@ -157,7 +157,7 @@ func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan }, nil } -func (i *L1ToL2MessageIndexer) processEvent(dep *types.DepositTx, log types.Log, chainID uint64) error { +func (i *L1ToL2MessageIndexer) ProcessEvent(dep *types.DepositTx, log types.Log, chainID uint64) error { depTx := types.NewTx(dep) i.log.Info("observed deposit event on L1", "hash", depTx.Hash().String(), "SourceHash", dep.SourceHash.String()) diff --git a/opsimulator/indexer_test.go b/opsimulator/indexer_test.go index b40e19f16..3cca8325f 100644 --- a/opsimulator/indexer_test.go +++ b/opsimulator/indexer_test.go @@ -102,6 +102,12 @@ func TestSubscribePublishTx(t *testing.T) { mockDepositTxs := createMockDepositTxs() chain := MockChainWithSubscriptions{testutils.NewMockChain(), mockDepositTxs} + depositPubTxCh := make(chan *types.Transaction, len(mockDepositTxs)) + + unsubscribe, err := indexer.SubscribeDepositMessage(chain.Config().ChainID, depositPubTxCh) + + require.NoError(t, err, "Should subscribe via chainId") + ctx := context.Background() depositTxCh := make(chan *types.DepositTx, len(mockDepositTxs)) @@ -117,27 +123,30 @@ func TestSubscribePublishTx(t *testing.T) { require.NoError(t, err) } - depositPubTxCh := make(chan *types.Transaction, len(mockDepositTxs)) - - unsubscribe, err := indexer.SubscribeDepositMessage(chain.Config().ChainID, depositPubTxCh) - - require.NoError(t, err, "Should subscribe via chainId") + for i := 0; i < len(mockDepositTxs); i++ { + dep := <-depositTxCh + log := <-logCh - // for i := 0; i < len(mockDepositTxs); i++ { - dep := <-depositPubTxCh + err := indexer.ProcessEvent(dep, log, chain.Config().ChainID) + require.NoError(t, err, "Should send valid details") - t.Logf("Transaction: %+v\n", dep) + // Source hash is lost in the marshal process + // require.Equal(t, dep.From, mockDepositTxs[i].From) + // require.Equal(t, dep.To, mockDepositTxs[i].To) + // require.Equal(t, dep.Mint, mockDepositTxs[i].Mint) + // require.Equal(t, dep.Value, mockDepositTxs[i].Value) + // require.Equal(t, dep.Gas, mockDepositTxs[i].Gas) + // require.Equal(t, dep.IsSystemTransaction, mockDepositTxs[i].IsSystemTransaction) + // require.Equal(t, dep.Data, mockDepositTxs[i].Data) - // //Source hash is lost in the marshal process - // // require.Equal(t, dep.From, mockDepositTxs[i].From) - // // require.Equal(t, dep.To, mockDepositTxs[i].To) - // // require.Equal(t, dep.Mint, mockDepositTxs[i].Mint) - // // require.Equal(t, dep.Value, mockDepositTxs[i].Value) - // // require.Equal(t, dep.Gas, mockDepositTxs[i].Gas) - // // require.Equal(t, dep.IsSystemTransaction, mockDepositTxs[i].IsSystemTransaction) - // // require.Equal(t, dep.Data, mockDepositTxs[i].Data) + } - // } + for i := 0; i < len(mockDepositTxs); i++ { + dep := <-depositPubTxCh + depTx := types.NewTx(mockDepositTxs[i]) + t.Log(dep.Hash().String()) + t.Log(depTx.Hash().String()) + } unsubscribe() sub.Unsubscribe() From 000f5c9ca3691e5a031e7874b9cbcd7fa4d196d9 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Sun, 5 Jan 2025 14:40:54 +0530 Subject: [PATCH 18/22] finished updated test case --- opsimulator/indexer_test.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/opsimulator/indexer_test.go b/opsimulator/indexer_test.go index 3cca8325f..42596fb42 100644 --- a/opsimulator/indexer_test.go +++ b/opsimulator/indexer_test.go @@ -123,29 +123,29 @@ func TestSubscribePublishTx(t *testing.T) { require.NoError(t, err) } + initiatedDepTxn := make([]*types.Transaction, len(mockDepositTxs)) + for i := 0; i < len(mockDepositTxs); i++ { dep := <-depositTxCh log := <-logCh + depTx := types.NewTx(dep) + + initiatedDepTxn[i] = depTx err := indexer.ProcessEvent(dep, log, chain.Config().ChainID) require.NoError(t, err, "Should send valid details") - - // Source hash is lost in the marshal process - // require.Equal(t, dep.From, mockDepositTxs[i].From) - // require.Equal(t, dep.To, mockDepositTxs[i].To) - // require.Equal(t, dep.Mint, mockDepositTxs[i].Mint) - // require.Equal(t, dep.Value, mockDepositTxs[i].Value) - // require.Equal(t, dep.Gas, mockDepositTxs[i].Gas) - // require.Equal(t, dep.IsSystemTransaction, mockDepositTxs[i].IsSystemTransaction) - // require.Equal(t, dep.Data, mockDepositTxs[i].Data) - } - for i := 0; i < len(mockDepositTxs); i++ { + for i := 0; i < len(initiatedDepTxn); i++ { dep := <-depositPubTxCh - depTx := types.NewTx(mockDepositTxs[i]) - t.Log(dep.Hash().String()) - t.Log(depTx.Hash().String()) + depTx := initiatedDepTxn[i] + + require.Equal(t, dep.To(), depTx.To()) + require.Equal(t, dep.IsDepositTx(), depTx.IsDepositTx()) + require.Equal(t, dep.Mint(), depTx.Mint()) + require.Equal(t, dep.SourceHash(), depTx.SourceHash()) + require.Equal(t, dep.Cost(), depTx.Cost()) + require.Equal(t, dep.Value(), depTx.Value()) } unsubscribe() From 392ef75467410f4e719d8abac11706d3e7799f6e Mon Sep 17 00:00:00 2001 From: serverConnected Date: Sun, 5 Jan 2025 16:31:19 +0530 Subject: [PATCH 19/22] updated indexer setup --- opsimulator/indexer.go | 3 ++- orchestrator/orchestrator.go | 36 +++++++++++++++++++----------------- supersim_test.go | 2 +- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index 19cac26fb..25ffa9f85 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -111,12 +111,13 @@ func (i *L1ToL2MessageIndexer) startForChain(ctx context.Context, client *ethcli return fmt.Errorf("failed to subscribe to deposit tx: %w", err) } - chainID := i.l2Chain.Config().ChainID + chainID := chain.Config().ChainID for { select { case dep := <-depositTxCh: log := <-logCh + i.log.Info("observed deposit event on L1", "deposit:", dep) if err := i.ProcessEvent(dep, log, chainID); err != nil { fmt.Printf("failed to process log: %v\n", err) } diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 854c8d4dd..7e00006ff 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -63,7 +63,7 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig for i := range networkConfig.L2Configs { cfg := networkConfig.L2Configs[i] - l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, cfg.Host, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay, depositStoreMngr) + l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, cfg.Host, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay) // only increment expected port if it has been specified if nextL2Port > 0 { @@ -74,7 +74,7 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig o := Orchestrator{log: log, config: networkConfig, l1Chain: l1Anvil, l2Chains: l2Anvils, l2OpSims: l2OpSims} depositStoreMngr := opsimulator.NewL1DepositStoreManager() - o.l1ToL2MsgIndexer := opsimulator.NewL1ToL2MessageIndexer(log, depositStoreMngr) + o.l1ToL2MsgIndexer = opsimulator.NewL1ToL2MessageIndexer(log, depositStoreMngr) // Interop Setup if networkConfig.InteropEnabled { @@ -97,19 +97,6 @@ func (o *Orchestrator) Start(ctx context.Context) error { // TODO: hack until opsim proxy supports websocket connections. // We need websocket connections to make subscriptions. // We should try to use make RPC through opsim not directly to the underlying chain - l2ChainClientByChainId := make(map[uint64]*ethclient.Client) - l2OpSimClientByChainId := make(map[uint64]*ethclient.Client) - l2ChainByChainId := make(map[uint64]config.Chain) - - for chainID, opSim := range o.l2OpSims { - l2ChainClientByChainId[chainID] = opSim.Chain.EthClient() - l2OpSimClientByChainId[chainID] = opSim.EthClient() - l2ChainByChainId[chainID] = opSim.Chain - } - - if err := o.l1ToL2MsgIndexer.Start(ctx, o.l1Chain.EthClient(), l2ChainByChainId); err != nil { - return fmt.Errorf("l1 to l2 message indexer failed to start: %w", err) - } // Start Chains if err := o.l1Chain.Start(ctx); err != nil { @@ -131,6 +118,21 @@ func (o *Orchestrator) Start(ctx context.Context) error { return fmt.Errorf("unable to start mining: %w", err) } + l2ChainClientByChainId := make(map[uint64]*ethclient.Client) + l2OpSimClientByChainId := make(map[uint64]*ethclient.Client) + l2ChainByChainId := make(map[uint64]config.Chain) + + for chainID, opSim := range o.l2OpSims { + o.log.Info("HERERE IS THE ISSUE", "chain", opSim.Chain) + l2ChainClientByChainId[chainID] = opSim.Chain.EthClient() + l2OpSimClientByChainId[chainID] = opSim.EthClient() + l2ChainByChainId[chainID] = opSim.Chain + } + + if err := o.l1ToL2MsgIndexer.Start(ctx, o.l1Chain.EthClient(), l2ChainByChainId); err != nil { + return fmt.Errorf("l1 to l2 message indexer failed to start: %w", err) + } + // Configure Interop (if applicable) if o.config.InteropEnabled { o.log.Info("configuring interop contracts") @@ -190,7 +192,7 @@ func (o *Orchestrator) Stop(ctx context.Context) error { } } - if err := o.indexer.Stop(ctx); err != nil { + if err := o.l1ToL2MsgIndexer.Stop(ctx); err != nil { errs = append(errs, fmt.Errorf("l1 to l2 message indexer failed to stop: %w", err)) } @@ -247,7 +249,7 @@ func (o *Orchestrator) L1Chain() config.Chain { func (o *Orchestrator) L2Chains() []config.Chain { var chains []config.Chain - for _, chain := range o.l2OpSims { + for _, chain := range o.l2Chains { chains = append(chains, chain) } return chains diff --git a/supersim_test.go b/supersim_test.go index 6fabc82cd..2999c6131 100644 --- a/supersim_test.go +++ b/supersim_test.go @@ -1129,7 +1129,7 @@ func TestAdminGetL2ToL2MessageByMsgHash(t *testing.T) { func TestAdminGetL1ToL2MessageByTxnHash(t *testing.T) { t.Parallel() - testSuite := createTestSuite(t, &config.CLIConfig{}) + testSuite := createTestSuite(t) l1Chain := testSuite.Supersim.Orchestrator.L1Chain() l1EthClient, _ := ethclient.Dial(l1Chain.Endpoint()) From b98faf9501ae7b516812e2012169121f4b9b18de Mon Sep 17 00:00:00 2001 From: serverConnected Date: Sun, 5 Jan 2025 16:42:29 +0530 Subject: [PATCH 20/22] updated admin to use indexer to fetch message --- admin/admin.go | 20 ++++++++++---------- opsimulator/indexer.go | 8 ++++++-- orchestrator/orchestrator.go | 2 +- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/admin/admin.go b/admin/admin.go index 51a3a357c..b5568becb 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -27,7 +27,7 @@ type AdminServer struct { networkConfig *config.NetworkConfig l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer - l1DepositStore *opsimulator.L1DepositStoreManager + l1ToL2MsgIndexer *opsimulator.L1ToL2MessageIndexer port uint64 } @@ -36,7 +36,7 @@ type RPCMethods struct { log log.Logger networkConfig *config.NetworkConfig l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer - l1DepositStore *opsimulator.L1DepositStoreManager + l1ToL2MsgIndexer *opsimulator.L1ToL2MessageIndexer } type JSONRPCError struct { @@ -89,12 +89,12 @@ func (err *JSONRPCError) ErrorCode() int { return err.Code } -func NewAdminServer(log log.Logger, port uint64, networkConfig *config.NetworkConfig, indexer *interop.L2ToL2MessageIndexer, l1DepositStore *opsimulator.L1DepositStoreManager) *AdminServer { +func NewAdminServer(log log.Logger, port uint64, networkConfig *config.NetworkConfig, l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer, l1ToL2MsgIndexer *opsimulator.L1ToL2MessageIndexer) *AdminServer { - adminServer := &AdminServer{log: log, port: port, networkConfig: networkConfig, l1DepositStore: l1DepositStore} + adminServer := &AdminServer{log: log, port: port, networkConfig: networkConfig, l1ToL2MsgIndexer: l1ToL2MsgIndexer} - if networkConfig.InteropEnabled && indexer != nil { - adminServer.l2ToL2MsgIndexer = indexer + if networkConfig.InteropEnabled && l2ToL2MsgIndexer != nil { + adminServer.l2ToL2MsgIndexer = l2ToL2MsgIndexer } return adminServer @@ -171,7 +171,7 @@ func (s *AdminServer) setupRouter() *gin.Engine { log: s.log, networkConfig: s.networkConfig, l2ToL2MsgIndexer: s.l2ToL2MsgIndexer, - l1DepositStore: s.l1DepositStore, + l1ToL2MsgIndexer: s.l1ToL2MsgIndexer, } if err := rpcServer.RegisterName("admin", rpcMethods); err != nil { @@ -257,10 +257,10 @@ func (m *RPCMethods) GetL2ToL2MessageByMsgHash(args *common.Hash) (*JSONL2ToL2Me } func (m *RPCMethods) GetL1ToL2MessageByTxnHash(args *common.Hash) (*JSONDepositMessage, error) { - if m.l1DepositStore == nil { + if m.l1ToL2MsgIndexer == nil { return nil, &JSONRPCError{ Code: -32601, - Message: "L1DepositStoreManager is not initialized.", + Message: "L1ToL2MsgIndexer is not initialized.", } } @@ -271,7 +271,7 @@ func (m *RPCMethods) GetL1ToL2MessageByTxnHash(args *common.Hash) (*JSONDepositM } } - storeEntry, err := m.l1DepositStore.Get(*args) + storeEntry, err := m.l1ToL2MsgIndexer.Get(*args) if err != nil { return nil, &JSONRPCError{ diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go index 25ffa9f85..49bafc75e 100644 --- a/opsimulator/indexer.go +++ b/opsimulator/indexer.go @@ -83,7 +83,7 @@ func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Clie i.chains = l2Chains for _, chain := range i.chains { - if err := i.startForChain(ctx, client, chain); err != nil { + if err := i.startForChain(client, chain); err != nil { return fmt.Errorf("Failed to start L1 to L2 indexer") } } @@ -91,7 +91,7 @@ func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Clie return nil } -func (i *L1ToL2MessageIndexer) startForChain(ctx context.Context, client *ethclient.Client, chain config.Chain) error { +func (i *L1ToL2MessageIndexer) startForChain(client *ethclient.Client, chain config.Chain) error { i.tasks.Go(func() error { depositTxCh := make(chan *types.DepositTx) logCh := make(chan types.Log) @@ -158,6 +158,10 @@ func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan }, nil } +func (i *L1ToL2MessageIndexer) Get(msgHash common.Hash) (*L1DepositMessage, error) { + return i.storeManager.Get(msgHash) +} + func (i *L1ToL2MessageIndexer) ProcessEvent(dep *types.DepositTx, log types.Log, chainID uint64) error { depTx := types.NewTx(dep) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 7e00006ff..4fa4e8f1c 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -84,7 +84,7 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig } } - a := admin.NewAdminServer(log, adminPort, networkConfig, o.l2ToL2MsgIndexer, depositStoreMngr) + a := admin.NewAdminServer(log, adminPort, networkConfig, o.l2ToL2MsgIndexer, o.l1ToL2MsgIndexer) o.AdminServer = a From 21444e5cc4c1f80148cd477987495345b050bc95 Mon Sep 17 00:00:00 2001 From: serverConnected Date: Sun, 5 Jan 2025 16:46:23 +0530 Subject: [PATCH 21/22] removal of log --- orchestrator/orchestrator.go | 1 - 1 file changed, 1 deletion(-) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 4fa4e8f1c..e1fc1267c 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -123,7 +123,6 @@ func (o *Orchestrator) Start(ctx context.Context) error { l2ChainByChainId := make(map[uint64]config.Chain) for chainID, opSim := range o.l2OpSims { - o.log.Info("HERERE IS THE ISSUE", "chain", opSim.Chain) l2ChainClientByChainId[chainID] = opSim.Chain.EthClient() l2OpSimClientByChainId[chainID] = opSim.EthClient() l2ChainByChainId[chainID] = opSim.Chain From fb3db88e655019972821159b9ac8ec299602156e Mon Sep 17 00:00:00 2001 From: serverConnected Date: Sun, 5 Jan 2025 16:48:16 +0530 Subject: [PATCH 22/22] comment update --- orchestrator/orchestrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index e1fc1267c..9f3e9818f 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -59,7 +59,7 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig l2Anvils[cfg.ChainID] = l2Anvil } - // Sping up OpSim to fornt the L2 instances + // Sping up OpSim to front the L2 instances for i := range networkConfig.L2Configs { cfg := networkConfig.L2Configs[i]