From 46842a7d3f5fef42a01cc90eb9b34acd5444fcb6 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Thu, 2 Nov 2023 02:31:56 -0700 Subject: [PATCH 1/5] [etl_optimisations] Removed component, changed abstrations, & redid block syncing --- config.env.template | 2 +- docs/architecture/etl.markdown | 25 +- docs/heuristics.markdown | 2 +- internal/client/client.go | 36 +- internal/client/eth.go | 19 +- internal/core/config.go | 9 +- internal/core/constants.go | 2 +- internal/core/core_test.go | 4 +- internal/core/etl.go | 15 +- internal/core/id_test.go | 4 +- internal/core/register.go | 17 +- internal/engine/manager_test.go | 2 +- internal/engine/registry/balance_enforce.go | 34 +- .../engine/registry/balance_enforce_test.go | 9 +- internal/engine/registry/registry.go | 6 +- internal/etl/component/ingress_test.go | 8 +- internal/etl/component/oracle.go | 131 ------ internal/etl/component/pipe.go | 2 +- internal/etl/component/pipe_test.go | 6 +- internal/etl/component/reader.go | 128 ++++++ internal/etl/component/types.go | 6 +- internal/etl/pipeline/analysis_test.go | 4 +- internal/etl/pipeline/graph_test.go | 14 +- internal/etl/pipeline/manager.go | 19 +- internal/etl/pipeline/manager_test.go | 2 +- internal/etl/pipeline/pipeline.go | 6 +- internal/etl/pipeline/pipeline_test.go | 16 +- internal/etl/pipeline/store_test.go | 4 +- internal/etl/registry/block.go | 148 ++++++ .../registry/{pipe/event_log.go => event.go} | 12 +- .../{pipe/event_log_test.go => event_test.go} | 6 +- .../etl/registry/oracle/account_balance.go | 121 ----- internal/etl/registry/oracle/geth_block.go | 279 ------------ .../etl/registry/oracle/geth_block_test.go | 424 ------------------ internal/etl/registry/oracle/types.go | 5 - internal/etl/registry/registry.go | 35 +- internal/etl/registry/registry_test.go | 6 +- internal/metrics/metrics.go | 79 +++- internal/mocks/etl_manager.go | 12 +- internal/mocks/oracle.go | 22 +- internal/subsystem/manager.go | 2 +- internal/subsystem/manager_test.go | 8 +- 42 files changed, 536 insertions(+), 1155 deletions(-) delete mode 100644 internal/etl/component/oracle.go create mode 100644 internal/etl/component/reader.go create mode 100644 internal/etl/registry/block.go rename internal/etl/registry/{pipe/event_log.go => event.go} (95%) rename internal/etl/registry/{pipe/event_log_test.go => event_test.go} (96%) delete mode 100644 internal/etl/registry/oracle/account_balance.go delete mode 100644 internal/etl/registry/oracle/geth_block.go delete mode 100644 internal/etl/registry/oracle/geth_block_test.go delete mode 100644 internal/etl/registry/oracle/types.go diff --git a/config.env.template b/config.env.template index ad26bd20..248c37c1 100644 --- a/config.env.template +++ b/config.env.template @@ -2,7 +2,7 @@ L1_RPC_ENDPOINT= L2_RPC_ENDPOINT= -# Oracle Geth Block Poll Intervals (ms) +# Chain L1_POLL_INTERVAL=5000 L2_POLL_INTERVAL=5000 diff --git a/docs/architecture/etl.markdown b/docs/architecture/etl.markdown index 17ba0ce3..6cb32f64 100644 --- a/docs/architecture/etl.markdown +++ b/docs/architecture/etl.markdown @@ -16,7 +16,7 @@ A component refers to a graph node within the ETL system. Every component perfor Currently, there are three total component types: 1. `Pipe` - Used to perform local arbitrary computations _(e.g. Extracting L1Withdrawal transactions from a block)_ -2. `Oracle` - Used to poll and collect data from some third-party source _(e.g. Querying real-time account balance amounts from an op-geth execution client)_ +2. `Reader` - Used to poll and collect data from some third-party source _(e.g. Querying real-time account balance amounts from an op-geth execution client)_ 3. `Aggregator` - Used to synchronize events between asynchronous data sources _(e.g. Synchronizing L1/L2 blocks to understand real-time changes in bridging TVL)_ ### Inter-Connectivity @@ -97,20 +97,19 @@ Once input data processing has been completed, the output data is then submitted * Generating opcode traces for some EVM transaction * Parsing emitted events from a transaction -### Oracle +### Reader -Oracles are responsible for collecting data from some external third party _(e.g. L1 geth node, L2 rollup node, etc.)_. As of now, oracle's are configurable through the use of a standard `OracleDefinition` interface that allows developers to write arbitrary oracle logic. +Oracles are responsible for collecting data from some external third party _(e.g. L1 geth node, L2 rollup node, etc.)_. As of now, reader's are configurable through the use of a standard `OracleDefinition` interface that allows developers to write arbitrary reader logic. The following key interface functions are supported/enforced: * `ReadRoutine` - Routine used for reading/polling real-time data for some arbitrarily configured data source -* `BackTestRoutine` - _Optional_ routine used for sequentially backtesting from some starting to ending block heights. -Unlike other components, `Oracles` actually employ _2 go routines_ to safely operate. This is because the definition routines are run as a separate go routine with a communication channel to the actual `Oracle` event loop. This is visualized below: +Unlike other components, `Oracles` actually employ _2 go routines_ to safely operate. This is because the definition routines are run as a separate go routine with a communication channel to the actual `Reader` event loop. This is visualized below: {% raw %}
graph LR; - subgraph A[Oracle] + subgraph A[Reader] B[eventLoop]-->|channel|ODefRoutine; B[eventLoop]-->|context|ODefRoutine; B-->B; @@ -185,7 +184,7 @@ A registry submodule is used to store all ETL data register definitions that pro ## Addressing -Some component's require knowledge of a specific address to properly function. For example, an oracle that polls a geth node for native ETH balance amounts would need knowledge of the address to poll. To support this, the ETL leverages a shared state store between the ETL and Risk Engine subsystems. +Some component's require knowledge of a specific address to properly function. For example, an reader that polls a geth node for native ETH balance amounts would need knowledge of the address to poll. To support this, the ETL leverages a shared state store between the ETL and Risk Engine subsystems. Shown below is how the ETL and Risk Engine interact with the shared state store using a `BalanceOracle` component as an example: @@ -210,7 +209,7 @@ graph LR; GETH --> |"{4} []balance"|BO BO("Balance - Oracle") --> |"{1} Get(PUUID)"|state + Reader") --> |"{1} Get(PUUID)"|state BO -."eventLoop()".-> BO state --> |"{2} []address"|BO @@ -218,14 +217,14 @@ graph LR;
{% endraw %} -### Geth Block Oracle Register +### Geth Block Reader Register -A `GethBlock` register refers to a block output extracted from a go-ethereum node. This register is used for creating `Oracle` components that poll and extract block data from a go-ethereum node in real-time. +A `BlockHeader` register refers to a block output extracted from a go-ethereum node. This register is used for creating `Reader` components that poll and extract block data from a go-ethereum node in real-time. -### Geth Account Balance Oracle Register +### Geth Account Balance Reader Register -An `AccountBalance` register refers to a native ETH balance output extracted from a go-ethereum node. This register is used for creating `Oracle` components that poll and extract native ETH balance data for some state persisted addresses from a go-ethereum node in real-time. -Unlike, the `GethBlock` register, this register requires knowledge of an address set that's shared with the risk engine to properly function and is therefore addressable. Because of this, any heuristic that uses this register must also be addressable. +An `AccountBalance` register refers to a native ETH balance output extracted from a go-ethereum node. This register is used for creating `Reader` components that poll and extract native ETH balance data for some state persisted addresses from a go-ethereum node in real-time. +Unlike, the `BlockHeader` register, this register requires knowledge of an address set that's shared with the risk engine to properly function and is therefore addressable. Because of this, any heuristic that uses this register must also be addressable. ## Managed ETL diff --git a/docs/heuristics.markdown b/docs/heuristics.markdown index 48035f85..d75d36f4 100644 --- a/docs/heuristics.markdown +++ b/docs/heuristics.markdown @@ -111,7 +111,7 @@ curl --location --request POST 'http://localhost:8080/v0/heuristic' \ **NOTE:** This heuristic requires an active RPC connection to both L1 and L2 networks. Furthermore, the Pessimism implementation of fault-detector assumes that a submitted L2 output on L1 will correspond to a canonical block on L2. -The hardcoded `fault_detector` heuristic scans for active `OutputProposed` events on an L1 Output Oracle contract. Once an event is detected, the heuristic implementation proceeds to reconstruct a local state output for the corresponding L2 block. If there is a mismatch between the L1 output and the local state output, the heuristic alerts. +The hardcoded `fault_detector` heuristic scans for active `OutputProposed` events on an L1 Output Reader contract. Once an event is detected, the heuristic implementation proceeds to reconstruct a local state output for the corresponding L2 block. If there is a mismatch between the L1 output and the local state output, the heuristic alerts. ### Parameters diff --git a/internal/client/client.go b/internal/client/client.go index a072e1c4..1a506930 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -6,7 +6,8 @@ import ( "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/logging" - indexer_client "github.com/ethereum-optimism/optimism/indexer/client" + ix_client "github.com/ethereum-optimism/optimism/indexer/client" + ix_node "github.com/ethereum-optimism/optimism/indexer/node" "go.uber.org/zap" ) @@ -15,13 +16,15 @@ type Config struct { L1RpcEndpoint string L2RpcEndpoint string - IndexerCfg *indexer_client.Config + IndexerCfg *ix_client.Config } // Bundle ... Used to store all client object references type Bundle struct { IndexerClient IndexerClient L1Client EthClient + L1Node ix_node.EthClient + L2Node ix_node.EthClient L2Client EthClient L2Geth GethClient } @@ -36,12 +39,24 @@ func NewBundle(ctx context.Context, cfg *Config) (*Bundle, error) { return nil, err } + l1NodeClient, err := NewNodeClient(ctx, cfg.L1RpcEndpoint) + if err != nil { + logger.Fatal("Error creating L1 node client", zap.Error(err)) + return nil, err + } + l2Client, err := NewEthClient(ctx, cfg.L2RpcEndpoint) if err != nil { logger.Fatal("Error creating L1 client", zap.Error(err)) return nil, err } + l2NodeClient, err := NewNodeClient(ctx, cfg.L2RpcEndpoint) + if err != nil { + logger.Fatal("Error creating L2 node client", zap.Error(err)) + return nil, err + } + l2Geth, err := NewGethClient(cfg.L2RpcEndpoint) if err != nil { logger.Fatal("Error creating L2 GETH client", zap.Error(err)) @@ -56,7 +71,9 @@ func NewBundle(ctx context.Context, cfg *Config) (*Bundle, error) { return &Bundle{ IndexerClient: indexerClient, L1Client: l1Client, + L1Node: l1NodeClient, L2Client: l2Client, + L2Node: l2NodeClient, L2Geth: l2Geth, }, nil } @@ -71,6 +88,21 @@ func FromContext(ctx context.Context) (*Bundle, error) { return b, nil } +// NodeClient ... +func (b *Bundle) NodeClient(n core.Network) (ix_node.EthClient, error) { + switch n { + case core.Layer1: + return b.L1Node, nil + + case core.Layer2: + return b.L2Node, nil + + default: + return nil, fmt.Errorf("invalid network supplied") + } + +} + // FromNetwork ... Retrieves an eth client from the context func FromNetwork(ctx context.Context, n core.Network) (EthClient, error) { bundle, err := FromContext(ctx) diff --git a/internal/client/eth.go b/internal/client/eth.go index 8c4d71c6..516e35ba 100644 --- a/internal/client/eth.go +++ b/internal/client/eth.go @@ -2,27 +2,18 @@ package client -/* - NOTE - eth client docs: https://pkg.go.dev/github.com/ethereum/go-ethereum/ethclient - eth api docs: https://geth.ethereum.org/docs/rpc/server -*/ - import ( "context" "math/big" + "github.com/base-org/pessimism/internal/metrics" + ix_node "github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" ) -// TODO (#20) : Introduce optional Retry-able EthClient - -// EthClient ... Provides interface wrapper for ethClient functions -// Useful for mocking go-ethereum json rpc client logic type EthClient interface { CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) @@ -40,3 +31,9 @@ type EthClient interface { func NewEthClient(ctx context.Context, rawURL string) (EthClient, error) { return ethclient.DialContext(ctx, rawURL) } + +func NewNodeClient(ctx context.Context, rpcURL string) (ix_node.EthClient, error) { + stats := metrics.WithContext(ctx) + + return ix_node.DialEthClient(rpcURL, stats) +} diff --git a/internal/core/config.go b/internal/core/config.go index 3c18945a..76452e2c 100644 --- a/internal/core/config.go +++ b/internal/core/config.go @@ -5,7 +5,7 @@ import ( "time" ) -// ClientConfig ... Configuration passed through to an oracle component constructor +// ClientConfig ... Configuration passed through to an reader component constructor type ClientConfig struct { Network Network PollInterval time.Duration @@ -31,12 +31,7 @@ type PipelineConfig struct { ClientConfig *ClientConfig } -// Backfill ... Returns true if the oracle is configured to backfill +// Backfill ... Returns true if the reader is configured to backfill func (oc *ClientConfig) Backfill() bool { return oc.StartHeight != nil } - -// Backtest ... Returns true if the oracle is configured to backtest -func (oc *ClientConfig) Backtest() bool { - return oc.EndHeight != nil -} diff --git a/internal/core/constants.go b/internal/core/constants.go index c7405143..e8db918f 100644 --- a/internal/core/constants.go +++ b/internal/core/constants.go @@ -21,7 +21,7 @@ const ( Clients ) -// Network ... Represents the network for which a pipeline's oracle +// Network ... Represents the network for which a pipeline's reader // is subscribed to. type Network uint8 diff --git a/internal/core/core_test.go b/internal/core/core_test.go index f877c0bf..7a9d92e5 100644 --- a/internal/core/core_test.go +++ b/internal/core/core_test.go @@ -13,7 +13,7 @@ import ( func Test_TransitData(t *testing.T) { // Verify construction td := core.NewTransitData( - core.GethBlock, + core.BlockHeader, nil, ) @@ -33,7 +33,7 @@ func Test_EngineRelay(t *testing.T) { outChan := make(chan core.HeuristicInput) eir := core.NewEngineRelay(core.NilPUUID(), outChan) - dummyTD := core.NewTransitData(core.AccountBalance, nil) + dummyTD := core.NewTransitData(core.BlockHeader, nil) // Verify relay and wrapping diff --git a/internal/core/etl.go b/internal/core/etl.go index b5724956..32b05c02 100644 --- a/internal/core/etl.go +++ b/internal/core/etl.go @@ -4,22 +4,19 @@ package core type ComponentType uint8 const ( - Oracle ComponentType = iota + 1 - Pipe - Aggregator + Reader ComponentType = iota + 1 + Transformer ) // String ... Converts the component type to a string func (ct ComponentType) String() string { switch ct { - case Oracle: - return "oracle" + case Reader: + return "reader" - case Pipe: - return "pipe" + case Transformer: + return "transformer" - case Aggregator: - return "aggregator" } return UnknownType diff --git a/internal/core/id_test.go b/internal/core/id_test.go index 09f0b588..c9b32bde 100644 --- a/internal/core/id_test.go +++ b/internal/core/id_test.go @@ -14,7 +14,7 @@ func Test_Component_ID(t *testing.T) { assert.Equal(t, expectedPID, actualID.PID) - expectedStr := "layer1:backtest:oracle:account_balance" + expectedStr := "layer1:backtest:reader:account_balance" actualStr := actualID.PID.String() assert.Equal(t, expectedStr, actualStr) @@ -28,7 +28,7 @@ func Test_Pipeline_ID(t *testing.T) { assert.Equal(t, expectedID, actualID.PID) - expectedStr := "backtest::layer1:backtest:oracle:account_balance::layer1:backtest:oracle:account_balance" + expectedStr := "backtest::layer1:backtest:reader:account_balance::layer1:backtest:reader:account_balance" actualStr := actualID.PID.String() assert.Equal(t, expectedStr, actualStr) diff --git a/internal/core/register.go b/internal/core/register.go index a5e5e76d..ca60b436 100644 --- a/internal/core/register.go +++ b/internal/core/register.go @@ -4,8 +4,7 @@ package core type RegisterType uint8 const ( - AccountBalance RegisterType = iota + 1 - GethBlock + BlockHeader RegisterType = iota + 1 EventLog ) @@ -13,11 +12,9 @@ const ( // register enum func (rt RegisterType) String() string { switch rt { - case AccountBalance: - return "account_balance" - case GethBlock: - return "geth_block" + case BlockHeader: + return "block_header" case EventLog: return "event_log" @@ -32,10 +29,10 @@ type DataRegister struct { Addressing bool Sk *StateKey - DataType RegisterType - ComponentType ComponentType - ComponentConstructor interface{} - Dependencies []RegisterType + DataType RegisterType + ComponentType ComponentType + Constructor interface{} + Dependencies []RegisterType } // StateKey ... Returns a cloned state key for a data register diff --git a/internal/engine/manager_test.go b/internal/engine/manager_test.go index eaa14962..3d017c51 100644 --- a/internal/engine/manager_test.go +++ b/internal/engine/manager_test.go @@ -74,7 +74,7 @@ func Test_EventLoop(t *testing.T) { hi := core.HeuristicInput{ PUUID: testPUUID, Input: core.TransitData{ - Type: core.AccountBalance, + Type: core.BlockHeader, Address: common.HexToAddress("0x69"), Value: float64(666), }, diff --git a/internal/engine/registry/balance_enforce.go b/internal/engine/registry/balance_enforce.go index 91b25924..89fc2e6a 100644 --- a/internal/engine/registry/balance_enforce.go +++ b/internal/engine/registry/balance_enforce.go @@ -1,13 +1,16 @@ package registry import ( + "context" "encoding/json" "fmt" "time" + "github.com/base-org/pessimism/internal/client" "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/engine/heuristic" "github.com/base-org/pessimism/internal/logging" + "github.com/ethereum/go-ethereum/core/types" "go.uber.org/zap" ) @@ -25,7 +28,9 @@ func (bi *BalanceInvConfig) Unmarshal(isp *core.SessionParams) error { // BalanceHeuristic ... type BalanceHeuristic struct { - cfg *BalanceInvConfig + ctx context.Context + cfg *BalanceInvConfig + client client.EthClient heuristic.Heuristic } @@ -41,10 +46,12 @@ const reportMsg = ` ` // NewBalanceHeuristic ... Initializer -func NewBalanceHeuristic(cfg *BalanceInvConfig) (heuristic.Heuristic, error) { +func NewBalanceHeuristic(ctx context.Context, cfg *BalanceInvConfig) (heuristic.Heuristic, error) { + return &BalanceHeuristic{ + ctx: ctx, cfg: cfg, - Heuristic: heuristic.NewBaseHeuristic(core.AccountBalance), + Heuristic: heuristic.NewBaseHeuristic(core.BlockHeader), }, nil } @@ -53,28 +60,35 @@ func NewBalanceHeuristic(cfg *BalanceInvConfig) (heuristic.Heuristic, error) { func (bi *BalanceHeuristic) Assess(td core.TransitData) (*core.Activation, bool, error) { logging.NoContext().Debug("Checking activation for balance heuristic", zap.String("data", fmt.Sprintf("%v", td))) - // 1. Validate and extract balance input - err := bi.ValidateInput(td) + header, ok := td.Value.(types.Header) + if !ok { + return nil, false, fmt.Errorf(couldNotCastErr, "BlockHeader") + } + + client, err := client.FromNetwork(bi.ctx, td.Network) if err != nil { return nil, false, err } - balance, ok := td.Value.(float64) - if !ok { - return nil, false, fmt.Errorf(couldNotCastErr, "float64") + // See if a tx changed the balance for the address + balance, err := client.BalanceAt(context.Background(), td.Address, header.Number) + if err != nil { + return nil, false, err } + ethBalance := float64(balance.Int64()) / 1000000000000000000 + activated := false // 2. Assess if balance > upper bound if bi.cfg.UpperBound != nil && - *bi.cfg.UpperBound < balance { + *bi.cfg.UpperBound < ethBalance { activated = true } // 3. Assess if balance < lower bound if bi.cfg.LowerBound != nil && - *bi.cfg.LowerBound > balance { + *bi.cfg.LowerBound > ethBalance { activated = true } diff --git a/internal/engine/registry/balance_enforce_test.go b/internal/engine/registry/balance_enforce_test.go index 52c21103..ada095bf 100644 --- a/internal/engine/registry/balance_enforce_test.go +++ b/internal/engine/registry/balance_enforce_test.go @@ -1,6 +1,7 @@ package registry_test import ( + "context" "testing" "github.com/base-org/pessimism/internal/core" @@ -12,7 +13,7 @@ func Test_Balance_Assess(t *testing.T) { upper := float64(5) lower := float64(1) - bi, err := registry.NewBalanceHeuristic( + bi, err := registry.NewBalanceHeuristic(context.Background(), ®istry.BalanceInvConfig{ Address: "0x123", UpperBound: &upper, @@ -23,7 +24,7 @@ func Test_Balance_Assess(t *testing.T) { // No activation testData1 := core.TransitData{ - Type: core.AccountBalance, + Type: core.BlockHeader, Value: float64(3), } @@ -33,7 +34,7 @@ func Test_Balance_Assess(t *testing.T) { // Upper bound activation testData2 := core.TransitData{ - Type: core.AccountBalance, + Type: core.BlockHeader, Value: float64(6), } @@ -43,7 +44,7 @@ func Test_Balance_Assess(t *testing.T) { // Lower bound activation testData3 := core.TransitData{ - Type: core.AccountBalance, + Type: core.BlockHeader, Value: float64(0.1), } diff --git a/internal/engine/registry/registry.go b/internal/engine/registry/registry.go index 938d3c0e..03971a52 100644 --- a/internal/engine/registry/registry.go +++ b/internal/engine/registry/registry.go @@ -27,7 +27,7 @@ func NewHeuristicTable() HeuristicTable { core.BalanceEnforcement: { PrepareValidate: ValidateAddressing, Policy: core.BothNetworks, - InputType: core.AccountBalance, + InputType: core.BlockHeader, Constructor: constructBalanceEnforcement, }, core.ContractEvent: { @@ -66,7 +66,7 @@ func constructEventInv(_ context.Context, isp *core.SessionParams) (heuristic.He } // constructBalanceEnforcement ... Constructs a balance heuristic instance -func constructBalanceEnforcement(_ context.Context, isp *core.SessionParams) (heuristic.Heuristic, error) { +func constructBalanceEnforcement(ctx context.Context, isp *core.SessionParams) (heuristic.Heuristic, error) { cfg := &BalanceInvConfig{} err := cfg.Unmarshal(isp) @@ -74,7 +74,7 @@ func constructBalanceEnforcement(_ context.Context, isp *core.SessionParams) (he return nil, err } - return NewBalanceHeuristic(cfg) + return NewBalanceHeuristic(ctx, cfg) } // constructFaultDetector ... Constructs a fault detector heuristic instance diff --git a/internal/etl/component/ingress_test.go b/internal/etl/component/ingress_test.go index 6f1d03bc..a7236822 100644 --- a/internal/etl/component/ingress_test.go +++ b/internal/etl/component/ingress_test.go @@ -27,7 +27,7 @@ func Test_Add_Remove_Ingress(t *testing.T) { testLogic: func(t *testing.T, ih *ingressHandler) { - err := ih.createIngress(core.GethBlock) + err := ih.createIngress(core.BlockHeader) assert.NoError(t, err, "geth.block register should added as an egress") }, @@ -38,7 +38,7 @@ func Test_Add_Remove_Ingress(t *testing.T) { constructionLogic: func() *ingressHandler { handler := newIngressHandler() - if err := handler.createIngress(core.GethBlock); err != nil { + if err := handler.createIngress(core.BlockHeader); err != nil { panic(err) } @@ -46,10 +46,10 @@ func Test_Add_Remove_Ingress(t *testing.T) { }, testLogic: func(t *testing.T, ih *ingressHandler) { - err := ih.createIngress(core.GethBlock) + err := ih.createIngress(core.BlockHeader) assert.Error(t, err, "geth.block register should fail to be added") - assert.Equal(t, err.Error(), fmt.Sprintf(ingressAlreadyExistsErr, core.GethBlock.String())) + assert.Equal(t, err.Error(), fmt.Sprintf(ingressAlreadyExistsErr, core.BlockHeader.String())) }, }, diff --git a/internal/etl/component/oracle.go b/internal/etl/component/oracle.go deleted file mode 100644 index 17980d2a..00000000 --- a/internal/etl/component/oracle.go +++ /dev/null @@ -1,131 +0,0 @@ -package component - -import ( - "context" - "math/big" - "sync" - "time" - - "github.com/base-org/pessimism/internal/core" - "github.com/base-org/pessimism/internal/logging" - "github.com/base-org/pessimism/internal/metrics" - "go.uber.org/zap" -) - -// OracleDefinition ... Provides a generalized interface for developers to bind their own functionality to -type OracleDefinition interface { - BackTestRoutine(ctx context.Context, componentChan chan core.TransitData, - startHeight *big.Int, endHeight *big.Int) error - ReadRoutine(ctx context.Context, componentChan chan core.TransitData) error - Height() (*big.Int, error) -} - -// Oracle ... Component used to represent a data source reader; E.g, Eth block indexing, interval API polling -type Oracle struct { - ctx context.Context - - definition OracleDefinition - oracleChannel chan core.TransitData - - wg *sync.WaitGroup - - *metaData -} - -// NewOracle ... Initializer -func NewOracle(ctx context.Context, outType core.RegisterType, - od OracleDefinition, opts ...Option) (Component, error) { - o := &Oracle{ - ctx: ctx, - definition: od, - oracleChannel: core.NewTransitChannel(), - wg: &sync.WaitGroup{}, - - metaData: newMetaData(core.Oracle, outType), - } - - for _, opt := range opts { - opt(o.metaData) - } - - logging.WithContext(ctx).Info("Constructed component", - zap.String(logging.CUUIDKey, o.metaData.id.String())) - - return o, nil -} - -// Height ... Returns the current block height of the oracle -func (o *Oracle) Height() (*big.Int, error) { - return o.definition.Height() -} - -// Close ... This function is called at the end when processes related to oracle need to shut down -func (o *Oracle) Close() error { - logging.WithContext(o.ctx). - Info("Waiting for oracle definition go routines to finish", - zap.String(logging.CUUIDKey, o.id.String())) - o.closeChan <- killSig - - o.wg.Wait() - logging.WithContext(o.ctx).Info("Oracle definition go routines have exited", - zap.String(logging.CUUIDKey, o.id.String())) - return nil -} - -// EventLoop ... Component loop that actively waits and transits register data -// from a channel that the definition's read routine writes to -func (o *Oracle) EventLoop() error { - // TODO(#24) - Add Internal Component Activity State Tracking - - logger := logging.WithContext(o.ctx) - - logger.Debug("Starting component event loop", - zap.String(logging.CUUIDKey, o.id.String())) - - o.wg.Add(1) - - routineCtx, cancel := context.WithCancel(o.ctx) - // o.emitStateChange(Live) - - // Spawn definition read routine - go func() { - defer o.wg.Done() - if err := o.definition.ReadRoutine(routineCtx, o.oracleChannel); err != nil { - logger.Error("Received error from read routine", - zap.String(logging.CUUIDKey, o.id.String()), - zap.Error(err)) - } - }() - - for { - select { - case registerData := <-o.oracleChannel: - logger.Debug("Sending data", - zap.String(logging.CUUIDKey, o.id.String())) - - if err := o.egressHandler.Send(registerData); err != nil { - logger.Error(transitErr, zap.String("ID", o.id.String())) - } - - if o.egressHandler.PathEnd() { - latency := float64(time.Since(registerData.OriginTS).Milliseconds()) - metrics.WithContext(o.ctx). - RecordPipelineLatency(o.pUUID, latency) - } - - case <-o.closeChan: - logger.Debug("Received component shutdown signal", - zap.String(logging.CUUIDKey, o.id.String())) - - // o.emitStateChange(Terminated) - logger.Debug("Closing component channel and context", - zap.String(logging.CUUIDKey, o.id.String())) - close(o.oracleChannel) - cancel() // End definition routine - - logger.Debug("Component shutdown success", - zap.String(logging.CUUIDKey, o.id.String())) - return nil - } - } -} diff --git a/internal/etl/component/pipe.go b/internal/etl/component/pipe.go index 78086b23..a10203a0 100644 --- a/internal/etl/component/pipe.go +++ b/internal/etl/component/pipe.go @@ -36,7 +36,7 @@ func NewPipe(ctx context.Context, pd PipeDefinition, inType core.RegisterType, def: pd, inType: inType, - metaData: newMetaData(core.Pipe, outType), + metaData: newMetaData(core.Transformer, outType), } if err := pipe.createIngress(inType); err != nil { diff --git a/internal/etl/component/pipe_test.go b/internal/etl/component/pipe_test.go index 9acfcefc..d99a6b68 100644 --- a/internal/etl/component/pipe_test.go +++ b/internal/etl/component/pipe_test.go @@ -27,7 +27,7 @@ func Test_Pipe_Event_Flow(t *testing.T) { outputChan := make(chan core.TransitData) // Construct test component - testPipe, err := mocks.NewDummyPipe(ctx, core.GethBlock, core.EventLog) + testPipe, err := mocks.NewDummyPipe(ctx, core.BlockHeader, core.EventLog) assert.NoError(t, err) err = testPipe.AddEgress(testID, outputChan) @@ -51,7 +51,7 @@ func Test_Pipe_Event_Flow(t *testing.T) { inputData := core.TransitData{ Timestamp: ts, - Type: core.GethBlock, + Type: core.BlockHeader, Value: block, } var outputData core.TransitData @@ -69,7 +69,7 @@ func Test_Pipe_Event_Flow(t *testing.T) { }() - entryChan, err := testPipe.GetIngress(core.GethBlock) + entryChan, err := testPipe.GetIngress(core.BlockHeader) assert.NoError(t, err) entryChan <- inputData diff --git a/internal/etl/component/reader.go b/internal/etl/component/reader.go new file mode 100644 index 00000000..f5caed3a --- /dev/null +++ b/internal/etl/component/reader.go @@ -0,0 +1,128 @@ +package component + +import ( + "context" + "math/big" + "sync" + "time" + + "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/logging" + "github.com/base-org/pessimism/internal/metrics" + "go.uber.org/zap" +) + +// ReadRoutine ... +type ReadRoutine interface { + Loop(ctx context.Context, componentChan chan core.TransitData) error + Height() (*big.Int, error) +} + +// ChainReader ... +type ChainReader struct { + ctx context.Context + + routine ReadRoutine + oracleChannel chan core.TransitData + + wg *sync.WaitGroup + + *metaData +} + +// NewReader ... Initializer +func NewReader(ctx context.Context, outType core.RegisterType, + rr ReadRoutine, opts ...Option) (Component, error) { + cr := &ChainReader{ + ctx: ctx, + routine: rr, + oracleChannel: core.NewTransitChannel(), + wg: &sync.WaitGroup{}, + + metaData: newMetaData(core.Reader, outType), + } + + for _, opt := range opts { + opt(cr.metaData) + } + + logging.WithContext(ctx).Info("Constructed component", + zap.String(logging.CUUIDKey, cr.metaData.id.String())) + + return cr, nil +} + +// Height ... Returns the current block height of the chain read routine +func (cr *ChainReader) Height() (*big.Int, error) { + return cr.routine.Height() +} + +// Close ... This function is called at the end when processes related to reader need to shut down +func (cr *ChainReader) Close() error { + logging.WithContext(cr.ctx). + Info("Waiting for reader definition go routines to finish", + zap.String(logging.CUUIDKey, cr.id.String())) + cr.closeChan <- killSig + + cr.wg.Wait() + logging.WithContext(cr.ctx).Info("Reader definition go routines have exited", + zap.String(logging.CUUIDKey, cr.id.String())) + return nil +} + +// EventLoop ... +func (cr *ChainReader) EventLoop() error { + // TODO(#24) - Add Internal Component Activity State Tracking + + logger := logging.WithContext(cr.ctx) + + logger.Debug("Starting component event loop", + zap.String(logging.CUUIDKey, cr.id.String())) + + cr.wg.Add(1) + + routineCtx, cancel := context.WithCancel(cr.ctx) + // cr.emitStateChange(Live) + + // Spawn definition read routine + go func() { + defer cr.wg.Done() + if err := cr.routine.Loop(routineCtx, cr.oracleChannel); err != nil { + logger.Error("Received error from read routine", + zap.String(logging.CUUIDKey, cr.id.String()), + zap.Error(err)) + } + }() + + for { + select { + case registerData := <-cr.oracleChannel: + logger.Debug("Sending data", + zap.String(logging.CUUIDKey, cr.id.String())) + + if err := cr.egressHandler.Send(registerData); err != nil { + logger.Error(transitErr, zap.String("ID", cr.id.String())) + } + + if cr.egressHandler.PathEnd() { + latency := float64(time.Since(registerData.OriginTS).Milliseconds()) + metrics.WithContext(cr.ctx). + RecordPipelineLatency(cr.pUUID, latency) + } + + case <-cr.closeChan: + logger.Debug("Received component shutdown signal", + zap.String(logging.CUUIDKey, cr.id.String())) + + // cr.emitStateChange(Terminated) + logger.Debug("Closing component channel and context", + zap.String(logging.CUUIDKey, cr.id.String())) + close(cr.oracleChannel) + cancel() // End definition routine + + logger.Debug("Component shutdown success", + zap.String(logging.CUUIDKey, cr.id.String())) + return nil + } + } +} diff --git a/internal/etl/component/types.go b/internal/etl/component/types.go index d1b97133..49e2b478 100644 --- a/internal/etl/component/types.go +++ b/internal/etl/component/types.go @@ -56,7 +56,7 @@ const ( ) type ( - // OracleConstructorFunc ... Type declaration that a registry oracle component constructor must adhere to + // OracleConstructorFunc ... Type declaration that a registry reader component constructor must adhere to OracleConstructorFunc = func(context.Context, *core.ClientConfig, ...Option) (Component, error) // PipeConstructorFunc ... Type declaration that a registry pipe component constructor must adhere to @@ -67,8 +67,8 @@ type ( type OracleType = string const ( - // BackTestOracle ... Represents an oracle used for backtesting some heuristic + // BackTestOracle ... Represents an reader used for backtesting some heuristic BacktestOracle OracleType = "backtest" - // LiveOracle ... Represents an oracle used for powering some live heuristic + // LiveOracle ... Represents an reader used for powering some live heuristic LiveOracle OracleType = "live" ) diff --git a/internal/etl/pipeline/analysis_test.go b/internal/etl/pipeline/analysis_test.go index 99b6e055..d36003a4 100644 --- a/internal/etl/pipeline/analysis_test.go +++ b/internal/etl/pipeline/analysis_test.go @@ -30,7 +30,7 @@ func Test_Mergable(t *testing.T) { }, testLogic: func(t *testing.T, a pipeline.Analyzer) { // Setup test pipelines - mockOracle, err := mocks.NewDummyOracle(context.Background(), core.GethBlock) + mockOracle, err := mocks.NewDummyReader(context.Background(), core.BlockHeader) assert.NoError(t, err) comps := []component.Component{mockOracle} @@ -61,7 +61,7 @@ func Test_Mergable(t *testing.T) { }, testLogic: func(t *testing.T, a pipeline.Analyzer) { // Setup test pipelines - mockOracle, err := mocks.NewDummyOracle(context.Background(), core.GethBlock) + mockOracle, err := mocks.NewDummyReader(context.Background(), core.BlockHeader) assert.NoError(t, err) comps := []component.Component{mockOracle} diff --git a/internal/etl/pipeline/graph_test.go b/internal/etl/pipeline/graph_test.go index 0223f56a..846e6b1b 100644 --- a/internal/etl/pipeline/graph_test.go +++ b/internal/etl/pipeline/graph_test.go @@ -36,7 +36,7 @@ func Test_Graph(t *testing.T) { testLogic: func(t *testing.T, g pl.ComponentGraph) { cUUID := core.MakeCUUID(69, 69, 69, 69) - component, err := mocks.NewDummyPipe(context.Background(), core.GethBlock, core.AccountBalance) + component, err := mocks.NewDummyPipe(context.Background(), core.BlockHeader, core.BlockHeader) assert.NoError(t, err) err = g.AddComponent(cUUID, component) @@ -63,7 +63,7 @@ func Test_Graph(t *testing.T) { constructionLogic: func() pl.ComponentGraph { g := pl.NewComponentGraph() - comp1, err := mocks.NewDummyOracle(context.Background(), core.GethBlock) + comp1, err := mocks.NewDummyReader(context.Background(), core.BlockHeader) if err != nil { panic(err) } @@ -72,7 +72,7 @@ func Test_Graph(t *testing.T) { panic(err) } - comp2, err := mocks.NewDummyPipe(context.Background(), core.GethBlock, core.AccountBalance) + comp2, err := mocks.NewDummyPipe(context.Background(), core.BlockHeader, core.BlockHeader) if err != nil { panic(err) } @@ -102,7 +102,7 @@ func Test_Graph(t *testing.T) { constructionLogic: func() pl.ComponentGraph { g := pl.NewComponentGraph() - comp1, err := mocks.NewDummyOracle(context.Background(), core.GethBlock) + comp1, err := mocks.NewDummyReader(context.Background(), core.BlockHeader) if err != nil { panic(err) } @@ -111,7 +111,7 @@ func Test_Graph(t *testing.T) { panic(err) } - comp2, err := mocks.NewDummyPipe(context.Background(), core.GethBlock, core.AccountBalance) + comp2, err := mocks.NewDummyPipe(context.Background(), core.BlockHeader, core.BlockHeader) if err != nil { panic(err) } @@ -141,7 +141,7 @@ func Test_Graph(t *testing.T) { constructionLogic: func() pl.ComponentGraph { g := pl.NewComponentGraph() - comp1, err := mocks.NewDummyOracle(context.Background(), core.GethBlock) + comp1, err := mocks.NewDummyReader(context.Background(), core.BlockHeader) if err != nil { panic(err) } @@ -150,7 +150,7 @@ func Test_Graph(t *testing.T) { panic(err) } - comp2, err := mocks.NewDummyPipe(context.Background(), core.GethBlock, core.AccountBalance) + comp2, err := mocks.NewDummyPipe(context.Background(), core.BlockHeader, core.BlockHeader) if err != nil { panic(err) } diff --git a/internal/etl/pipeline/manager.go b/internal/etl/pipeline/manager.go index 1e88ab75..08b47b82 100644 --- a/internal/etl/pipeline/manager.go +++ b/internal/etl/pipeline/manager.go @@ -22,7 +22,7 @@ type Manager interface { InferComponent(cc *core.ClientConfig, cUUID core.CUUID, pUUID core.PUUID, register *core.DataRegister) (component.Component, error) GetStateKey(rt core.RegisterType) (*core.StateKey, bool, error) - GetPipelineHeight(id core.PUUID) (*big.Int, error) + GetHeightAtPipeline(id core.PUUID) (*big.Int, error) CreateDataPipeline(cfg *core.PipelineConfig) (core.PUUID, bool, error) RunPipeline(pID core.PUUID) error ActiveCount() int @@ -243,25 +243,22 @@ func (em *etlManager) InferComponent(cc *core.ClientConfig, cUUID core.CUUID, pU } switch register.ComponentType { - case core.Oracle: - init, success := register.ComponentConstructor.(component.OracleConstructorFunc) + case core.Reader: + init, success := register.Constructor.(component.OracleConstructorFunc) if !success { - return nil, fmt.Errorf(fmt.Sprintf(couldNotCastErr, core.Oracle.String())) + return nil, fmt.Errorf(fmt.Sprintf(couldNotCastErr, core.Reader.String())) } return init(em.ctx, cc, opts...) - case core.Pipe: - init, success := register.ComponentConstructor.(component.PipeConstructorFunc) + case core.Transformer: + init, success := register.Constructor.(component.PipeConstructorFunc) if !success { - return nil, fmt.Errorf(fmt.Sprintf(couldNotCastErr, core.Pipe.String())) + return nil, fmt.Errorf(fmt.Sprintf(couldNotCastErr, core.Reader.String())) } return init(em.ctx, cc, opts...) - case core.Aggregator: - return nil, fmt.Errorf(noAggregatorErr) - default: return nil, fmt.Errorf(unknownCompType, register.ComponentType.String()) } @@ -281,7 +278,7 @@ func (em *etlManager) GetStateKey(rt core.RegisterType) (*core.StateKey, bool, e return nil, false, nil } -func (em *etlManager) GetPipelineHeight(id core.PUUID) (*big.Int, error) { +func (em *etlManager) GetHeightAtPipeline(id core.PUUID) (*big.Int, error) { pipeline, err := em.store.GetPipelineFromPUUID(id) if err != nil { return nil, err diff --git a/internal/etl/pipeline/manager_test.go b/internal/etl/pipeline/manager_test.go index 611a4405..33ee6411 100644 --- a/internal/etl/pipeline/manager_test.go +++ b/internal/etl/pipeline/manager_test.go @@ -42,7 +42,7 @@ func Test_Manager(t *testing.T) { testLogic: func(t *testing.T, m Manager) { cUUID := core.MakeCUUID(1, 1, 1, 1) - register, err := registry.NewRegistry().GetRegister(core.GethBlock) + register, err := registry.NewRegistry().GetRegister(core.BlockHeader) assert.NoError(t, err) diff --git a/internal/etl/pipeline/pipeline.go b/internal/etl/pipeline/pipeline.go index 86ad958a..51d404d7 100644 --- a/internal/etl/pipeline/pipeline.go +++ b/internal/etl/pipeline/pipeline.go @@ -72,12 +72,12 @@ func (pl *pipeline) UUID() core.PUUID { func (pl *pipeline) BlockHeight() (*big.Int, error) { comp := pl.components[len(pl.components)-1] - oracle, ok := comp.(*component.Oracle) + cr, ok := comp.(*component.ChainReader) if !ok { - return nil, fmt.Errorf("could not cast component to oracle") + return nil, fmt.Errorf("could not cast component to chain reader") } - return oracle.Height() + return cr.Height() } // AddEngineRelay ... Adds a relay to the pipeline that forces it to send transformed heuristic input diff --git a/internal/etl/pipeline/pipeline_test.go b/internal/etl/pipeline/pipeline_test.go index bcc0e22c..10e08c0d 100644 --- a/internal/etl/pipeline/pipeline_test.go +++ b/internal/etl/pipeline/pipeline_test.go @@ -27,12 +27,12 @@ func Test_Pipeline(t *testing.T) { constructionLogic: func() pipeline.Pipeline { testPipe, _ := mocks.NewDummyPipe( context.Background(), - core.GethBlock, + core.BlockHeader, core.EventLog) - testO, _ := mocks.NewDummyOracle( + testO, _ := mocks.NewDummyReader( context.Background(), - core.GethBlock) + core.BlockHeader) pl, err := pipeline.NewPipeline( nil, @@ -48,7 +48,7 @@ func Test_Pipeline(t *testing.T) { testLogic: func(t *testing.T, pl pipeline.Pipeline) { assert.Equal(t, pl.Components()[0].OutputType(), core.EventLog) - assert.Equal(t, pl.Components()[1].OutputType(), core.GethBlock) + assert.Equal(t, pl.Components()[1].OutputType(), core.BlockHeader) }, }, { @@ -56,9 +56,9 @@ func Test_Pipeline(t *testing.T) { function: "AddEngineRelay", constructionLogic: func() pipeline.Pipeline { - testO, _ := mocks.NewDummyOracle( + testO, _ := mocks.NewDummyReader( context.Background(), - core.GethBlock) + core.BlockHeader) pl, err := pipeline.NewPipeline( nil, @@ -83,9 +83,9 @@ func Test_Pipeline(t *testing.T) { function: "RunPipeline", constructionLogic: func() pipeline.Pipeline { - testO, _ := mocks.NewDummyOracle( + testO, _ := mocks.NewDummyReader( context.Background(), - core.GethBlock) + core.BlockHeader) pl, err := pipeline.NewPipeline( nil, diff --git a/internal/etl/pipeline/store_test.go b/internal/etl/pipeline/store_test.go index 06b9ed22..10b3513a 100644 --- a/internal/etl/pipeline/store_test.go +++ b/internal/etl/pipeline/store_test.go @@ -22,12 +22,12 @@ var ( // getTestPipeLine ... Returns a test pipeline func getTestPipeLine(ctx context.Context) pl.Pipeline { - c1, err := mocks.NewDummyOracle(ctx, core.GethBlock, component.WithCUUID(cID2)) + c1, err := mocks.NewDummyReader(ctx, core.BlockHeader, component.WithCUUID(cID2)) if err != nil { panic(err) } - c2, err := mocks.NewDummyPipe(ctx, core.GethBlock, core.EventLog, component.WithCUUID(cID1)) + c2, err := mocks.NewDummyPipe(ctx, core.BlockHeader, core.EventLog, component.WithCUUID(cID1)) if err != nil { panic(err) } diff --git a/internal/etl/registry/block.go b/internal/etl/registry/block.go new file mode 100644 index 00000000..d696d530 --- /dev/null +++ b/internal/etl/registry/block.go @@ -0,0 +1,148 @@ +package registry + +import ( + "context" + "math/big" + "time" + + "github.com/base-org/pessimism/internal/client" + "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/etl/component" + "github.com/base-org/pessimism/internal/metrics" + ix_node "github.com/ethereum-optimism/optimism/indexer/node" + "github.com/ethereum/go-ethereum/core/types" +) + +const ( + batchSize = 100 + + notFoundMsg = "not found" +) + +type BlockTraversal struct { + n core.Network + cUUID core.CUUID + pUUID core.PUUID + + client ix_node.EthClient + traversal *ix_node.HeaderTraversal + pollInterval time.Duration + + stats metrics.Metricer +} + +func NewBlockTraversal(ctx context.Context, cfg *core.ClientConfig, + opts ...component.Option) (component.Component, error) { + clients, err := client.FromContext(ctx) + if err != nil { + return nil, err + } + + node, err := clients.NodeClient(cfg.Network) + if err != nil { + return nil, err + } + + var startHeader *types.Header + if cfg.EndHeight != nil { + header, err := node.BlockHeaderByNumber(cfg.EndHeight) + if err != nil { + return nil, err + } + + startHeader = header + } + + // TODO - Support network confirmation counts + ht := ix_node.NewHeaderTraversal(node, startHeader, big.NewInt(0)) + + bt := &BlockTraversal{ + n: cfg.Network, + client: node, + traversal: ht, + pollInterval: time.Duration(cfg.PollInterval) * time.Millisecond, + } + + reader, err := component.NewReader(ctx, core.BlockHeader, bt, opts...) + if err != nil { + return nil, err + } + + bt.cUUID = reader.UUID() + bt.pUUID = reader.PUUID() + return reader, nil +} + +func (bt *BlockTraversal) Height() (*big.Int, error) { + return bt.traversal.LastHeader().Number, nil +} + +func (bt *BlockTraversal) Backfill(start, end *big.Int, consumer chan core.TransitData) error { + for i := start; i.Cmp(end) < 0; i.Add(i, big.NewInt(batchSize)) { + end := big.NewInt(0).Add(i, big.NewInt(batchSize)) + + headers, err := bt.client.BlockHeadersByRange(i, end) + if err != nil { + return err + } + + for _, header := range headers { + consumer <- core.TransitData{ + Timestamp: time.Now(), + Type: core.BlockHeader, + Value: header, + } + } + } + + return nil +} + +// Loop ... +func (bt *BlockTraversal) Loop(ctx context.Context, consumer chan core.TransitData) error { + ticker := time.NewTicker(1 * time.Second) + + recent, err := bt.client.BlockHeaderByNumber(nil) + if err != nil { + return err + } + + // backfill if provided starting header + if bt.traversal.LastHeader() != nil { + + bt.Backfill(bt.traversal.LastHeader().Number, recent.Number, consumer) + } else { + bt.traversal = ix_node.NewHeaderTraversal(bt.client, recent, big.NewInt(0)) + } + + for { + select { + case <-ticker.C: + + header, err := bt.client.BlockHeaderByNumber(nil) + if err != nil { + return err + } + + if header.Number.Cmp(bt.traversal.LastHeader().Number) > 0 { + headers, err := bt.traversal.NextFinalizedHeaders(batchSize) + if err != nil { + return err + } + + for _, header := range headers { + consumer <- core.TransitData{ + Network: bt.n, + Timestamp: time.Now(), + Type: core.BlockHeader, + Value: header, + } + } + } + + case <-ctx.Done(): + return nil + } + } + +} diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/event.go similarity index 95% rename from internal/etl/registry/pipe/event_log.go rename to internal/etl/registry/event.go index 035fcbe2..42f6a1cc 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/event.go @@ -1,4 +1,4 @@ -package pipe +package registry import ( "context" @@ -70,7 +70,7 @@ func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, } // 2. Embed the definition into a generic pipe construction - p, err := component.NewPipe(ctx, ed, core.GethBlock, core.EventLog, opts...) + p, err := component.NewPipe(ctx, ed, core.BlockHeader, core.EventLog, opts...) if err != nil { return nil, err } @@ -201,13 +201,11 @@ func (ed *EventDefinition) attemptDLQ(ctx context.Context) ([]core.TransitData, // transformFunc ... Gets the events from the block, filters them and // returns them if they are in the list of events to monitor func (ed *EventDefinition) transformFunc(ctx context.Context, td core.TransitData) ([]core.TransitData, error) { - // 1. Convert arbitrary transit data to go-ethereum compatible block type - block, success := td.Value.(types.Block) + header, success := td.Value.(types.Header) if !success { - return []core.TransitData{}, fmt.Errorf("could not convert to block") + return []core.TransitData{}, fmt.Errorf("could not convert to header") } - // 2. Fetch the addresses and events to monitor for logging.NoContext().Debug("Getting addresses", zap.String(logging.PUUIDKey, ed.pUUID.String())) @@ -217,7 +215,7 @@ func (ed *EventDefinition) transformFunc(ctx context.Context, td core.TransitDat } topics := ed.getTopics(ctx, addresses, ed.ss) - hash := block.Header().Hash() + hash := header.Hash() // 3. Construct and execute a filter query on the provided block hash // to get the relevant logs diff --git a/internal/etl/registry/pipe/event_log_test.go b/internal/etl/registry/event_test.go similarity index 96% rename from internal/etl/registry/pipe/event_log_test.go rename to internal/etl/registry/event_test.go index f8c278e2..ccb59978 100644 --- a/internal/etl/registry/pipe/event_log_test.go +++ b/internal/etl/registry/event_test.go @@ -1,4 +1,4 @@ -package pipe_test +package registry_test import ( "context" @@ -6,7 +6,6 @@ import ( "testing" "github.com/base-org/pessimism/internal/core" - "github.com/base-org/pessimism/internal/etl/registry/pipe" "github.com/base-org/pessimism/internal/state" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -14,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/base-org/pessimism/internal/etl/component" + "github.com/base-org/pessimism/internal/etl/registry" "github.com/base-org/pessimism/internal/mocks" ) @@ -42,7 +42,7 @@ func defConstructor(t *testing.T) *testSuite { _ = state.InsertUnique(ctx, innerKey, "transfer(address,address,uint256)") - ed, err := pipe.NewEventDefinition(ctx, core.Layer1) + ed, err := registry.NewEventDefinition(ctx, core.Layer1) if err != nil { t.Fatal(err) } diff --git a/internal/etl/registry/oracle/account_balance.go b/internal/etl/registry/oracle/account_balance.go deleted file mode 100644 index 150e0a82..00000000 --- a/internal/etl/registry/oracle/account_balance.go +++ /dev/null @@ -1,121 +0,0 @@ -package oracle - -import ( - "context" - "fmt" - "math/big" - "time" - - "github.com/base-org/pessimism/internal/client" - pess_common "github.com/base-org/pessimism/internal/common" - "github.com/base-org/pessimism/internal/core" - "github.com/base-org/pessimism/internal/etl/component" - "github.com/base-org/pessimism/internal/logging" - "github.com/base-org/pessimism/internal/state" - "github.com/ethereum/go-ethereum/common" - - "go.uber.org/zap" -) - -// TODO(#21): Verify config validity during Oracle construction -// AddressBalanceODef ... Address register oracle definition used to drive oracle component -type AddressBalanceODef struct { - pUUID core.PUUID - cfg *core.ClientConfig - client client.EthClient - currHeight *big.Int - sk *core.StateKey -} - -func (oracle *AddressBalanceODef) Height() (*big.Int, error) { - return oracle.currHeight, nil -} - -// NewAddressBalanceODef ... Initializer for address.balance oracle definition -func NewAddressBalanceODef(cfg *core.ClientConfig, client client.EthClient, - h *big.Int) *AddressBalanceODef { - return &AddressBalanceODef{ - cfg: cfg, - client: client, - currHeight: h, - } -} - -// NewAddressBalanceOracle ... Initializer for address.balance oracle component -func NewAddressBalanceOracle(ctx context.Context, cfg *core.ClientConfig, - opts ...component.Option) (component.Component, error) { - client, err := client.FromNetwork(ctx, cfg.Network) - if err != nil { - return nil, err - } - - od := NewAddressBalanceODef(cfg, client, nil) - o, err := component.NewOracle(ctx, core.GethBlock, od, opts...) - if err != nil { - return nil, err - } - - od.sk = o.StateKey().Clone() - return o, nil -} - -// BackTestRoutine ... -// NOTE - This oracle does not support backtesting -// TODO (#59) : Add account balance backtesting support -func (oracle *AddressBalanceODef) BackTestRoutine(_ context.Context, _ chan core.TransitData, - _ *big.Int, _ *big.Int) error { - return fmt.Errorf(noBackTestSupportError) -} - -// ReadRoutine ... Sequentially polls go-ethereum compatible execution -// client for address (EOA, Contract) native balance amounts -func (oracle *AddressBalanceODef) ReadRoutine(ctx context.Context, componentChan chan core.TransitData) error { - stateStore, err := state.FromContext(ctx) - if err != nil { - return err - } - - ticker := time.NewTicker(oracle.cfg.PollInterval * time.Millisecond) //nolint:durationcheck // inapplicable - for { - select { - case <-ticker.C: // Polling - ts := time.Now() - logging.NoContext().Debug("Getting addresess", - zap.String(logging.PUUIDKey, oracle.pUUID.String())) - - // Get addresses from shared state store for pipeline uuid - - addresses, err := stateStore.GetSlice(ctx, oracle.sk) - if err != nil { - logging.WithContext(ctx).Error(err.Error()) - continue - } - - for _, address := range addresses { - // Convert to go-ethereum address type - gethAddress := common.HexToAddress(address) - logging.NoContext().Debug("Balance query", - zap.String(logging.AddrKey, gethAddress.String())) - - // Get balance using go-ethereum client - weiBalance, err := oracle.client.BalanceAt(ctx, gethAddress, nil) - if err != nil { - logging.WithContext(ctx).Error(err.Error()) - continue - } - - // Convert wei to ether - // NOTE - There is a possibility of precision loss here - // TODO (#58) : Verify precision loss - ethBalance, _ := pess_common.WeiToEther(weiBalance).Float64() - - // Send parsed float64 balance value to downstream component channel - componentChan <- core.NewTransitData(core.AccountBalance, ethBalance, - core.WithAddress(gethAddress), core.WithOriginTS(ts)) - } - - case <-ctx.Done(): // Shutdown - return nil - } - } -} diff --git a/internal/etl/registry/oracle/geth_block.go b/internal/etl/registry/oracle/geth_block.go deleted file mode 100644 index c9860e45..00000000 --- a/internal/etl/registry/oracle/geth_block.go +++ /dev/null @@ -1,279 +0,0 @@ -package oracle - -import ( - "context" - "errors" - "math/big" - "time" - - "github.com/base-org/pessimism/internal/client" - "github.com/base-org/pessimism/internal/core" - "github.com/base-org/pessimism/internal/etl/component" - "github.com/base-org/pessimism/internal/logging" - "github.com/base-org/pessimism/internal/metrics" - "github.com/ethereum/go-ethereum/core/types" - "go.uber.org/zap" -) - -const ( - notFoundMsg = "not found" -) - -// TODO(#21): Verify config validity during Oracle construction -// GethBlockODef ...GethBlock register oracle definition used to drive oracle component -type GethBlockODef struct { - cUUID core.CUUID - pUUID core.PUUID - cfg *core.ClientConfig - client client.EthClient - currHeight *big.Int - - stats metrics.Metricer -} - -// NewGethBlockODef ... Initializer for geth.block oracle definition -func NewGethBlockODef(cfg *core.ClientConfig, client client.EthClient, - h *big.Int, stats metrics.Metricer) *GethBlockODef { - return &GethBlockODef{ - cfg: cfg, - client: client, - currHeight: h, - stats: stats, - } -} - -// NewGethBlockOracle ... Initializer for geth.block oracle component -func NewGethBlockOracle(ctx context.Context, cfg *core.ClientConfig, - opts ...component.Option) (component.Component, error) { - client, err := client.FromNetwork(ctx, cfg.Network) - if err != nil { - return nil, err - } - - od := NewGethBlockODef(cfg, client, nil, metrics.WithContext(ctx)) - - oracle, err := component.NewOracle(ctx, core.GethBlock, od, opts...) - if err != nil { - return nil, err - } - - od.cUUID = oracle.UUID() - od.pUUID = oracle.PUUID() - return oracle, nil -} - -func (oracle *GethBlockODef) Height() (*big.Int, error) { - return oracle.currHeight, nil -} - -// getCurrentHeightFromNetwork ... Gets the current height of the network and will not quit until found -func (oracle *GethBlockODef) getCurrentHeightFromNetwork(ctx context.Context) *types.Header { - for { - header, err := oracle.client.HeaderByNumber(ctx, nil) - if err != nil { - oracle.stats.RecordNodeError(oracle.cfg.Network) - logging.WithContext(ctx).Error("problem fetching current height from network", zap.Error(err)) - continue - } - return header - } -} - -// BackTestRoutine ... -func (oracle *GethBlockODef) BackTestRoutine(ctx context.Context, componentChan chan core.TransitData, - startHeight *big.Int, endHeight *big.Int) error { - if endHeight.Cmp(startHeight) < 0 { - return errors.New("start height cannot be more than the end height") - } - - currentHeader := oracle.getCurrentHeightFromNetwork(ctx) - - if startHeight.Cmp(currentHeader.Number) == 1 { - return errors.New("start height cannot be more than the latest height from network") - } - - ticker := time.NewTicker(oracle.cfg.PollInterval * time.Millisecond) //nolint:durationcheck // inapplicable - height := startHeight - - for { - select { - case <-ticker.C: - - headerAsInterface, err := oracle.fetchData(ctx, height, core.FetchHeader) - headerAsserted, headerAssertedOk := headerAsInterface.(*types.Header) - - if err != nil || !headerAssertedOk { - logging.WithContext(ctx).Error("problem fetching or asserting header", zap.NamedError("headerFetch", err), - zap.Bool("headerAsserted", headerAssertedOk)) - oracle.stats.RecordNodeError(oracle.cfg.Network) - continue - } - - blockAsInterface, err := oracle.fetchData(ctx, headerAsserted.Number, core.FetchBlock) - blockAsserted, blockAssertedOk := blockAsInterface.(*types.Block) - - if err != nil || !blockAssertedOk { - // logging.WithContext(ctx).Error("problem fetching or asserting block", zap.NamedError("blockFetch", err), - // zap.Bool("blockAsserted", blockAssertedOk)) - oracle.stats.RecordNodeError(oracle.cfg.Network) - continue - } - - // TODO - Add support for database persistence - componentChan <- core.TransitData{ - OriginTS: time.Now(), - Timestamp: time.Now(), - Type: core.GethBlock, - Value: *blockAsserted, - } - - if height.Cmp(endHeight) == 0 { - logging.WithContext(ctx).Info("Completed back-test routine.") - return nil - } - - height.Add(height, big.NewInt(1)) - - case <-ctx.Done(): - return nil - } - } -} - -// getHeightToProcess ... -// -// Check if current height is nil, if it is, then check if starting height is provided: -// 1. if start height is provided, use that number as the current height -// 2. if not, then sending nil as current height means use the latest -// if current height is not nil, skip all above steps and continue iterating. -// At the end, if the end height is specified and not nil, if its met, it returns once done. -// Start Height and End Height is inclusive in fetching blocks. -func (oracle *GethBlockODef) getHeightToProcess(ctx context.Context) *big.Int { - if oracle.currHeight == nil { - logging.WithContext(ctx).Info("Current Height is nil, looking for starting height") - if oracle.cfg.StartHeight != nil { - logging.WithContext(ctx).Info("StartHeight found to be: %d, using that value.", zap.Int64("StartHeight", - oracle.cfg.StartHeight.Int64())) - return oracle.cfg.StartHeight - } - logging.WithContext(ctx).Info("Starting Height is nil, using latest block as starting point.") - return nil - } - return oracle.currHeight -} - -// fetchHeaderWithRetry ... retry for specified number of times. -// Not an exponent backoff, but a simpler method which retries sooner -func (oracle *GethBlockODef) fetchData(ctx context.Context, height *big.Int, - fetchType core.FetchType) (interface{}, error) { - if fetchType == core.FetchHeader { - return oracle.client.HeaderByNumber(ctx, height) - } - return oracle.client.BlockByNumber(ctx, height) -} - -func validHeightParams(start, end *big.Int) error { - if end != nil && start == nil { - return errors.New("cannot start with latest block height with end height configured") - } - - if end != nil && start != nil && - end.Cmp(start) < 0 { - return errors.New("start height cannot be more than the end height") - } - - return nil -} - -// ReadRoutine ... Sequentially polls go-ethereum compatible execution -// client using monotonic block height variable for block metadata -// & writes block metadata to output listener components -func (oracle *GethBlockODef) ReadRoutine(ctx context.Context, componentChan chan core.TransitData) error { - // NOTE - Might need improvements in future as the project takes shape. - - // Now fetching current height from the network - // currentHeader := oracle.getCurrentHeightFromNetwork(ctx) - - // if oracle.cfg.StartHeight.Cmp(currentHeader.Number) == 1 { - // return errors.New("start height cannot be more than the latest height from network") - // } - - if err := validHeightParams(oracle.cfg.StartHeight, oracle.cfg.EndHeight); err != nil { - return err - } - - logging.WithContext(ctx). - Debug("Starting poll routine", zap.Duration("poll_interval", oracle.cfg.PollInterval), - zap.String(logging.CUUIDKey, oracle.cUUID.String())) - - ticker := time.NewTicker(oracle.cfg.PollInterval * time.Millisecond) //nolint:durationcheck // inapplicable - for { - select { - case <-ticker.C: - opStart := time.Now() - - height := oracle.getHeightToProcess(ctx) - if height != nil { - logging.WithContext(ctx).Debug("Polling block for processing", - zap.Int("Height", int(height.Int64())), - zap.String(logging.CUUIDKey, oracle.cUUID.String())) - } - - headerAsInterface, err := oracle.fetchData(ctx, height, core.FetchHeader) - headerAsserted, headerAssertedOk := headerAsInterface.(*types.Header) - - // Ensure err is indicative of block not existing yet - if err != nil && err.Error() == notFoundMsg { - continue - } - - if err != nil || !headerAssertedOk { - logging.WithContext(ctx).Error("problem fetching or asserting header", zap.NamedError("headerFetch", err), - zap.Bool("headerAsserted", headerAssertedOk), zap.String(logging.CUUIDKey, oracle.cUUID.String())) - oracle.stats.RecordNodeError(oracle.cfg.Network) - continue - } - - blockAsInterface, err := oracle.fetchData(ctx, headerAsserted.Number, core.FetchBlock) - block, blockAssertedOk := blockAsInterface.(*types.Block) - - if err != nil || !blockAssertedOk { - logging.WithContext(ctx).Error("problem fetching or asserting block", zap.NamedError("blockFetch", err), - zap.Bool("blockAsserted", blockAssertedOk), zap.String(logging.CUUIDKey, oracle.cUUID.String())) - oracle.stats.RecordNodeError(oracle.cfg.Network) - continue - } - - blockTS := time.Unix(int64(block.Time()), 0) - oracle.stats.RecordBlockLatency(oracle.cfg.Network, float64(time.Since(blockTS).Milliseconds())) - - componentChan <- core.TransitData{ - OriginTS: opStart, - Timestamp: time.Now(), - Type: core.GethBlock, - Value: *block, - } - - // check has to be done here to include the end height block - if oracle.cfg.EndHeight != nil && height.Cmp(oracle.cfg.EndHeight) == 0 { - return nil - } - - if height != nil { - height.Add(height, big.NewInt(1)) - } else { - height = &big.Int{} - height.Add(headerAsserted.Number, big.NewInt(1)) - } - - logging.NoContext().Debug("New height", zap.Int("Height", int(height.Int64())), - zap.String(logging.CUUIDKey, oracle.cUUID.String())) - - oracle.currHeight = height - - case <-ctx.Done(): - logging.NoContext().Info("Geth.block oracle routine ending", zap.String(logging.CUUIDKey, oracle.cUUID.String())) - return nil - } - } -} diff --git a/internal/etl/registry/oracle/geth_block_test.go b/internal/etl/registry/oracle/geth_block_test.go deleted file mode 100644 index 73fe2f6f..00000000 --- a/internal/etl/registry/oracle/geth_block_test.go +++ /dev/null @@ -1,424 +0,0 @@ -package oracle - -import ( - "context" - "fmt" - "math/big" - "testing" - - "github.com/base-org/pessimism/internal/core" - "github.com/base-org/pessimism/internal/logging" - "github.com/base-org/pessimism/internal/metrics" - "github.com/base-org/pessimism/internal/mocks" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/trie" - gomock "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" -) - -func Test_GetCurrentHeightFromNetwork(t *testing.T) { - - ctx, cancel := context.WithCancel(context.Background()) - logging.New(core.Development) - defer cancel() - - // setup mock - ctrl := gomock.NewController(t) - defer ctrl.Finish() - testObj := mocks.NewMockEthClient(ctrl) - - header := types.Header{ - ParentHash: common.HexToHash("0x123456789"), - Number: big.NewInt(5), - } - // setup expectations - testObj. - EXPECT(). - HeaderByNumber(gomock.Any(), gomock.Any()). - Return(&header, nil) - - od := &GethBlockODef{cfg: &core.ClientConfig{ - NumOfRetries: 3, - }, currHeight: nil, client: testObj} - - assert.Equal(t, od.getCurrentHeightFromNetwork(ctx).Number, header.Number) -} - -func Test_GetHeightToProcess(t *testing.T) { - - ctx, cancel := context.WithCancel(context.Background()) - logging.New(core.Development) - defer cancel() - - // setup mock - ctrl := gomock.NewController(t) - defer ctrl.Finish() - testObj := mocks.NewMockEthClient(ctrl) - - header := types.Header{ - ParentHash: common.HexToHash("0x123456789"), - Number: big.NewInt(5), - } - testObj. - EXPECT(). - HeaderByNumber(gomock.Any(), gomock.Any()). - Return(&header, nil). - AnyTimes() - - od := &GethBlockODef{cfg: &core.ClientConfig{ - NumOfRetries: 3, - }, currHeight: big.NewInt(123), client: testObj} - - assert.Equal(t, od.getHeightToProcess(ctx), big.NewInt(123)) - - od.currHeight = nil - od.cfg.StartHeight = big.NewInt(123) - assert.Equal(t, od.getHeightToProcess(ctx), big.NewInt(123)) - - od.currHeight = nil - od.cfg.StartHeight = nil - assert.Nil(t, od.getHeightToProcess(ctx)) -} - -func Test_Backroutine(t *testing.T) { - logging.New(core.Development) - var tests = []struct { - name string - description string - - constructionLogic func() (*GethBlockODef, chan core.TransitData) - testLogic func(*testing.T, *GethBlockODef, chan core.TransitData) - }{ - - { - name: "Current network height check", - description: "Check if network height check is less than starting height", - - constructionLogic: func() (*GethBlockODef, chan core.TransitData) { - // setup mock - ctrl := gomock.NewController(t) - defer ctrl.Finish() - testObj := mocks.NewMockEthClient(ctrl) - - header := types.Header{ - ParentHash: common.HexToHash("0x123456789"), - Number: big.NewInt(5), - } - // setup expectationss - testObj. - EXPECT(). - HeaderByNumber(gomock.Any(), gomock.Any()). - Return(&header, nil). - AnyTimes() - - od := &GethBlockODef{cfg: &core.ClientConfig{ - NumOfRetries: 3, - }, currHeight: nil, client: testObj} - - outChan := make(chan core.TransitData) - - return od, outChan - }, - - testLogic: func(t *testing.T, od *GethBlockODef, outChan chan core.TransitData) { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := od.BackTestRoutine(ctx, outChan, big.NewInt(7), big.NewInt(10)) - assert.Error(t, err) - assert.EqualError(t, err, "start height cannot be more than the latest height from network") - }, - }, - { - name: "Successful Height check", - description: "Ending height cannot be less than the Starting height", - - constructionLogic: func() (*GethBlockODef, chan core.TransitData) { - // setup mock - ctrl := gomock.NewController(t) - defer ctrl.Finish() - testObj := mocks.NewMockEthClient(ctrl) - - od := &GethBlockODef{cfg: &core.ClientConfig{ - NumOfRetries: 3, - }, currHeight: nil, client: testObj} - - outChan := make(chan core.TransitData) - - return od, outChan - }, - - testLogic: func(t *testing.T, od *GethBlockODef, outChan chan core.TransitData) { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := od.BackTestRoutine(ctx, outChan, big.NewInt(2), big.NewInt(1)) - assert.Error(t, err) - assert.EqualError(t, err, "start height cannot be more than the end height") - }, - }, - // Leaving this here to help devs test infinite loops - // - //{ - // name: "Header fetch retry exceeded error check", - // description: "Check if the header fetch retry fails after 3 retries, total 4 tries.", - // - // constructionLogic: func() (*GethBlockODef, chan core.TransitData) { - // testObj := new(EthClientMocked) - // - // // setup expectations - // testObj.On("DialContext", mock.Anything, "pass test").Return(nil) - // testObj.On("HeaderByNumber", mock.Anything, mock.Anything).Return(nil, errors.New("no header for you")) - // - // od := &GethBlockODef{cfg: &core.ClientConfig{ - // RPCEndpoint: "pass test", - // NumOfRetries: 3, - // }, currHeight: nil, client: testObj} - // - // outChan := make(chan core.TransitData) - // return od, outChan - // }, - // - // testLogic: func(t *testing.T, od *GethBlockODef, outChan chan core.TransitData) { - // - // ctx, cancel := context.WithCancel(context.Background()) - // defer cancel() - // - // err := od.BackTestRoutine(ctx, outChan, big.NewInt(1), big.NewInt(2)) - // assert.Error(t, err) - // assert.EqualError(t, err, "no header for you") - // }, - // }, - { - name: "Backroutine happy path test", - description: "Backroutine works and channel should have 4 messages waiting.", - - constructionLogic: func() (*GethBlockODef, chan core.TransitData) { - // setup mock - ctrl := gomock.NewController(t) - defer ctrl.Finish() - testObj := mocks.NewMockEthClient(ctrl) - - header := types.Header{ - ParentHash: common.HexToHash("0x123456789"), - Number: big.NewInt(7), - } - block := types.NewBlock(&header, nil, nil, nil, trie.NewStackTrie(nil)) - // setup expectations - testObj. - EXPECT(). - HeaderByNumber(gomock.Any(), gomock.Any()). - Return(&header, nil). - AnyTimes() - testObj. - EXPECT(). - BlockByNumber(gomock.Any(), gomock.Any()). - Return(block, nil). - AnyTimes() - - od := &GethBlockODef{cfg: &core.ClientConfig{ - NumOfRetries: 3, - PollInterval: 1000, - }, currHeight: nil, client: testObj} - - outChan := make(chan core.TransitData, 2) - - return od, outChan - }, - - testLogic: func(t *testing.T, od *GethBlockODef, outChan chan core.TransitData) { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := od.BackTestRoutine(ctx, outChan, big.NewInt(5), big.NewInt(6)) - assert.NoError(t, err) - close(outChan) - - for m := range outChan { - val := m.Value.(types.Block) //nolint:errcheck // converting to type from any for getting internal values - assert.Equal(t, val.ParentHash(), common.HexToHash("0x123456789")) - } - }, - }, - } - - for i, tc := range tests { - t.Run(fmt.Sprintf("%d-%s", i, tc.name), func(t *testing.T) { - od, outChan := tc.constructionLogic() - tc.testLogic(t, od, outChan) - }) - - } -} - -func Test_ReadRoutine(t *testing.T) { - logging.New(core.Development) - var tests = []struct { - name string - description string - - constructionLogic func() (*GethBlockODef, chan core.TransitData) - testLogic func(*testing.T, *GethBlockODef, chan core.TransitData) - }{ - - { - name: "Successful Height check 1", - description: "Ending height cannot be less than the Starting height", - - constructionLogic: func() (*GethBlockODef, chan core.TransitData) { - // setup mock - ctrl := gomock.NewController(t) - defer ctrl.Finish() - testObj := mocks.NewMockEthClient(ctrl) - - od := &GethBlockODef{cfg: &core.ClientConfig{ - StartHeight: big.NewInt(2), - EndHeight: big.NewInt(1), - NumOfRetries: 3, - PollInterval: 1000, - }, currHeight: nil, client: testObj} - outChan := make(chan core.TransitData) - return od, outChan - }, - testLogic: func(t *testing.T, od *GethBlockODef, outChan chan core.TransitData) { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := od.ReadRoutine(ctx, outChan) - assert.Error(t, err) - assert.EqualError(t, err, "start height cannot be more than the end height") - }, - }, - { - name: "Successful Height check 2", - description: "Cannot have start height nil, i.e, latest block and end height configured", - - constructionLogic: func() (*GethBlockODef, chan core.TransitData) { - // setup mock - ctrl := gomock.NewController(t) - defer ctrl.Finish() - testObj := mocks.NewMockEthClient(ctrl) - - od := &GethBlockODef{cfg: &core.ClientConfig{ - StartHeight: nil, - EndHeight: big.NewInt(1), - NumOfRetries: 3, - PollInterval: 1000, - }, currHeight: nil, client: testObj, stats: metrics.NoopMetrics} - outChan := make(chan core.TransitData) - return od, outChan - }, - - testLogic: func(t *testing.T, od *GethBlockODef, outChan chan core.TransitData) { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := od.ReadRoutine(ctx, outChan) - assert.Error(t, err) - assert.EqualError(t, err, "cannot start with latest block height with end height configured") - }, - }, - { - description: "Making sure that number of blocks fetched matches the assumption. Number of messages should be 5, in the channel", - - constructionLogic: func() (*GethBlockODef, chan core.TransitData) { - // setup mock - ctrl := gomock.NewController(t) - defer ctrl.Finish() - testObj := mocks.NewMockEthClient(ctrl) - - header := types.Header{ - ParentHash: common.HexToHash("0x123456789"), - Number: big.NewInt(7), - } - block := types.NewBlock(&header, nil, nil, nil, trie.NewStackTrie(nil)) - - testObj. - EXPECT(). - HeaderByNumber(gomock.Any(), gomock.Any()). - Return(&header, nil). - AnyTimes() - testObj. - EXPECT(). - BlockByNumber(gomock.Any(), gomock.Any()). - Return(block, nil). - AnyTimes() - - od := &GethBlockODef{cfg: &core.ClientConfig{ - StartHeight: big.NewInt(1), - EndHeight: big.NewInt(5), - NumOfRetries: 3, - PollInterval: 1000, - }, currHeight: nil, client: testObj, stats: metrics.NoopMetrics} - outChan := make(chan core.TransitData, 10) - return od, outChan - }, - - testLogic: func(t *testing.T, od *GethBlockODef, outChan chan core.TransitData) { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := od.ReadRoutine(ctx, outChan) - assert.NoError(t, err) - close(outChan) - assert.Equal(t, len(outChan), 5) - }, - }, - // Leaving this here to help devs test infinite loops - // - //{ - // name: "Latest block check", - // description: "Making sure that number of blocks fetched matches the assumption. Number of messages should be 5, in the channel", - // - // constructionLogic: func() (*GethBlockODef, chan core.TransitData) { - // testObj := new(EthClientMocked) - // header := types.Header{ - // ParentHash: common.HexToHash("0x123456789"), - // Number: big.NewInt(1), - // } - // block := types.NewBlock(&header, nil, nil, nil, trie.NewStackTrie(nil)) - // // setup expectations - // testObj.On("DialContext", mock.Anything, "pass test").Return(nil) - // testObj.On("HeaderByNumber", mock.Anything, mock.Anything).Return(&header, nil) - // testObj.On("BlockByNumber", mock.Anything, mock.Anything).Return(block, nil) - // - // od := &GethBlockODef{cfg: &core.ClientConfig{ - // RPCEndpoint: "pass test", - // StartHeight: nil, - // EndHeight: nil, - // NumOfRetries: 3, - // PollInterval: 1000, - - // }, currHeight: nil, client: testObj} - // outChan := make(chan core.TransitData, 10) - // return od, outChan - // }, - // - // testLogic: func(t *testing.T, od *GethBlockODef, outChan chan core.TransitData) { - // - // ctx, cancel := context.WithCancel(context.Background()) - // defer cancel() - // - // err := od.ReadRoutine(ctx, outChan) - // assert.NoError(t, err) - // close(outChan) - // assert.Equal(t, len(outChan), 5) - // }, - // }, - } - - for i, tc := range tests { - t.Run(fmt.Sprintf("%d-%s", i, tc.name), func(t *testing.T) { - od, outChan := tc.constructionLogic() - tc.testLogic(t, od, outChan) - }) - - } -} diff --git a/internal/etl/registry/oracle/types.go b/internal/etl/registry/oracle/types.go deleted file mode 100644 index f685bc90..00000000 --- a/internal/etl/registry/oracle/types.go +++ /dev/null @@ -1,5 +0,0 @@ -package oracle - -const ( - noBackTestSupportError = "backtest routine is unimplemented" -) diff --git a/internal/etl/registry/registry.go b/internal/etl/registry/registry.go index 3d591e0d..100f8899 100644 --- a/internal/etl/registry/registry.go +++ b/internal/etl/registry/registry.go @@ -4,8 +4,6 @@ import ( "fmt" "github.com/base-org/pessimism/internal/core" - "github.com/base-org/pessimism/internal/etl/registry/oracle" - "github.com/base-org/pessimism/internal/etl/registry/pipe" ) const ( @@ -27,36 +25,23 @@ type componentRegistry struct { // that contains all extractable ETL data types func NewRegistry() Registry { registers := map[core.RegisterType]*core.DataRegister{ - core.GethBlock: { - Addressing: false, - DataType: core.GethBlock, - ComponentType: core.Oracle, - ComponentConstructor: oracle.NewGethBlockOracle, + core.BlockHeader: { + Addressing: false, + DataType: core.BlockHeader, + ComponentType: core.Reader, + Constructor: NewBlockTraversal, Dependencies: noDeps(), Sk: noState(), }, - core.AccountBalance: { - Addressing: true, - DataType: core.AccountBalance, - ComponentType: core.Oracle, - ComponentConstructor: oracle.NewAddressBalanceOracle, - Dependencies: noDeps(), - Sk: &core.StateKey{ - Nesting: false, - Prefix: core.AccountBalance, - ID: core.AddressKey, - PUUID: nil, - }, - }, core.EventLog: { - Addressing: true, - DataType: core.EventLog, - ComponentType: core.Pipe, - ComponentConstructor: pipe.NewEventParserPipe, + Addressing: true, + DataType: core.EventLog, + ComponentType: core.Transformer, + Constructor: NewEventParserPipe, - Dependencies: makeDeps(core.GethBlock), + Dependencies: makeDeps(core.BlockHeader), Sk: &core.StateKey{ Nesting: true, Prefix: core.EventLog, diff --git a/internal/etl/registry/registry_test.go b/internal/etl/registry/registry_test.go index 0084c95d..b7cb35d2 100644 --- a/internal/etl/registry/registry_test.go +++ b/internal/etl/registry/registry_test.go @@ -41,11 +41,11 @@ func Test_ComponentRegistry(t *testing.T) { constructionLogic: registry.NewRegistry, testLogic: func(t *testing.T, testRegistry registry.Registry) { - reg, err := testRegistry.GetRegister(core.GethBlock) + reg, err := testRegistry.GetRegister(core.BlockHeader) assert.NoError(t, err) assert.NotNil(t, reg) - assert.Equal(t, reg.DataType, core.GethBlock) + assert.Equal(t, reg.DataType, core.BlockHeader) }, }, { @@ -61,7 +61,7 @@ func Test_ComponentRegistry(t *testing.T) { assert.NoError(t, err) assert.Len(t, path.Path, 2) - assert.Equal(t, path.Path[1].DataType, core.GethBlock) + assert.Equal(t, path.Path[1].DataType, core.BlockHeader) assert.Equal(t, path.Path[0].DataType, core.EventLog) }, }, diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index d1b663d0..7bf98f3b 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -8,6 +8,7 @@ import ( "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/engine/heuristic" "github.com/base-org/pessimism/internal/logging" + "github.com/ethereum/go-ethereum/rpc" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -18,6 +19,7 @@ const ( metricsNamespace = "pessimism" SubsystemHeuristics = "heuristics" SubsystemEtl = "etl" + batchMethod = "batch" ) const serverShutdownTimeout = 10 * time.Second @@ -43,20 +45,24 @@ type Metricer interface { RecordUp() Start() Shutdown(ctx context.Context) error + RecordRPCClientRequest(method string) func(err error) + RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error) Document() []DocumentedMetric } type Metrics struct { - Up prometheus.Gauge - ActivePipelines *prometheus.GaugeVec - ActiveHeuristics *prometheus.GaugeVec - HeuristicRuns *prometheus.CounterVec - AlertsGenerated *prometheus.CounterVec - NodeErrors *prometheus.CounterVec - BlockLatency *prometheus.GaugeVec - PipelineLatency *prometheus.GaugeVec - InvExecutionTime *prometheus.GaugeVec - HeuristicErrors *prometheus.CounterVec + Up prometheus.Gauge + ActivePipelines *prometheus.GaugeVec + ActiveHeuristics *prometheus.GaugeVec + HeuristicRuns *prometheus.CounterVec + AlertsGenerated *prometheus.CounterVec + NodeErrors *prometheus.CounterVec + BlockLatency *prometheus.GaugeVec + PipelineLatency *prometheus.GaugeVec + InvExecutionTime *prometheus.GaugeVec + rpcClientRequestsTotal *prometheus.CounterVec + rpcClientRequestDurationSeconds *prometheus.HistogramVec + HeuristicErrors *prometheus.CounterVec registry *prometheus.Registry factory Factory @@ -87,6 +93,23 @@ func New(ctx context.Context, cfg *Config) (Metricer, func(), error) { factory := With(registry) stats = &Metrics{ + rpcClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: SubsystemEtl, + Name: "requests_total", + Help: "Total RPC requests initiated by the RPC client", + }, []string{ + "method", + }), + rpcClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: SubsystemEtl, + Name: "request_duration_seconds", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, + Help: "Histogram of RPC client request durations", + }, []string{ + "method", + }), Up: factory.NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, Name: "up", @@ -229,6 +252,36 @@ func (m *Metrics) RecordPipelineLatency(pUUID core.PUUID, latency float64) { m.PipelineLatency.WithLabelValues(pUUID.String()).Set(latency) } +func (m *Metrics) RecordRPCClientRequest(method string) func(err error) { + m.rpcClientRequestsTotal.WithLabelValues(method).Inc() + // timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(method)) + // return func(err error) { + // m.recordRPCClientResponse(method, err) + // timer.ObserveDuration() + // } + + return nil +} + +func (m *Metrics) RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error) { + m.rpcClientRequestsTotal.WithLabelValues(batchMethod).Add(float64(len(b))) + for _, elem := range b { + m.rpcClientRequestsTotal.WithLabelValues(elem.Method).Inc() + } + + // timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(batchMethod)) + // return func(err error) { + // m.recordRPCClientResponse(batchMethod, err) + // timer.ObserveDuration() + + // // Record errors for individual requests + // for _, elem := range b { + // m.recordRPCClientResponse(elem.Method, elem.Error) + // } + // } + return nil +} + // Shutdown ... Shuts down the metrics server func (m *Metrics) Shutdown(ctx context.Context) error { return m.server.Shutdown(ctx) @@ -255,6 +308,12 @@ func (n *noopMetricer) RecordNodeError(_ core.Network) func (n *noopMetricer) RecordBlockLatency(_ core.Network, _ float64) {} func (n *noopMetricer) RecordPipelineLatency(_ core.PUUID, _ float64) {} func (n *noopMetricer) RecordAssessmentError(_ heuristic.Heuristic) {} +func (n *noopMetricer) RecordRPCClientRequest(method string) func(err error) { + return func(err error) {} +} +func (n *noopMetricer) RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error) { + return func(err error) {} +} func (n *noopMetricer) Shutdown(_ context.Context) error { return nil diff --git a/internal/mocks/etl_manager.go b/internal/mocks/etl_manager.go index 7687210b..82e3f4b5 100644 --- a/internal/mocks/etl_manager.go +++ b/internal/mocks/etl_manager.go @@ -80,19 +80,19 @@ func (mr *EtlManagerMockRecorder) EventLoop() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventLoop", reflect.TypeOf((*EtlManager)(nil).EventLoop)) } -// GetPipelineHeight mocks base method. -func (m *EtlManager) GetPipelineHeight(arg0 core.PUUID) (*big.Int, error) { +// GetHeightAtPipeline mocks base method. +func (m *EtlManager) GetHeightAtPipeline(arg0 core.PUUID) (*big.Int, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPipelineHeight", arg0) + ret := m.ctrl.Call(m, "GetHeightAtPipeline", arg0) ret0, _ := ret[0].(*big.Int) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetPipelineHeight indicates an expected call of GetPipelineHeight. -func (mr *EtlManagerMockRecorder) GetPipelineHeight(arg0 interface{}) *gomock.Call { +// GetHeightAtPipeline indicates an expected call of GetHeightAtPipeline. +func (mr *EtlManagerMockRecorder) GetHeightAtPipeline(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPipelineHeight", reflect.TypeOf((*EtlManager)(nil).GetPipelineHeight), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHeightAtPipeline", reflect.TypeOf((*EtlManager)(nil).GetHeightAtPipeline), arg0) } // GetStateKey mocks base method. diff --git a/internal/mocks/oracle.go b/internal/mocks/oracle.go index 1f42eb9f..44611ba4 100644 --- a/internal/mocks/oracle.go +++ b/internal/mocks/oracle.go @@ -8,30 +8,24 @@ import ( "github.com/base-org/pessimism/internal/etl/component" ) -type mockOracleDefinition struct { +type mockTraversal struct { } -func (md *mockOracleDefinition) ConfigureRoutine(core.PUUID) error { +func (md *mockTraversal) ConfigureRoutine(core.PUUID) error { return nil } -func (md *mockOracleDefinition) BackTestRoutine(_ context.Context, _ chan core.TransitData, - _ *big.Int, _ *big.Int) error { +func (md *mockTraversal) Loop(_ context.Context, _ chan core.TransitData) error { return nil } -func (md *mockOracleDefinition) ReadRoutine(_ context.Context, _ chan core.TransitData) error { - return nil -} - -func (md *mockOracleDefinition) Height() (*big.Int, error) { +func (md *mockTraversal) Height() (*big.Int, error) { return big.NewInt(0), nil } -// NewDummyOracle ... Takes in a register type that specifies the mocked output type -// Useful for testing inter-component connectivity and higher level component management abstractions -func NewDummyOracle(ctx context.Context, ot core.RegisterType, opts ...component.Option) (component.Component, error) { - od := &mockOracleDefinition{} +// NewDummyReader +func NewDummyReader(ctx context.Context, ot core.RegisterType, opts ...component.Option) (component.Component, error) { + mt := &mockTraversal{} - return component.NewOracle(ctx, ot, od, opts...) + return component.NewReader(ctx, ot, mt, opts...) } diff --git a/internal/subsystem/manager.go b/internal/subsystem/manager.go index 0564b244..ee45934a 100644 --- a/internal/subsystem/manager.go +++ b/internal/subsystem/manager.go @@ -222,5 +222,5 @@ func (m *Manager) etlLimitReached() bool { } func (m *Manager) PipelineHeight(pUUID core.PUUID) (*big.Int, error) { - return m.etl.GetPipelineHeight(pUUID) + return m.etl.GetHeightAtPipeline(pUUID) } diff --git a/internal/subsystem/manager_test.go b/internal/subsystem/manager_test.go index fabd8f40..05ed98ee 100644 --- a/internal/subsystem/manager_test.go +++ b/internal/subsystem/manager_test.go @@ -51,7 +51,7 @@ func createTestSuite(t *testing.T) *testSuite { func Test_BuildDeployCfg(t *testing.T) { pConfig := &core.PipelineConfig{ Network: core.Layer1, - DataType: core.GethBlock, + DataType: core.BlockHeader, PipelineType: core.Live, ClientConfig: nil, } @@ -271,7 +271,7 @@ func Test_BuildPipelineCfg(t *testing.T) { constructor: func(t *testing.T) *testSuite { ts := createTestSuite(t) ts.mockEng.EXPECT().GetInputType(core.BalanceEnforcement). - Return(core.AccountBalance, testErr()). + Return(core.BlockHeader, testErr()). Times(1) return ts @@ -293,7 +293,7 @@ func Test_BuildPipelineCfg(t *testing.T) { constructor: func(t *testing.T) *testSuite { ts := createTestSuite(t) ts.mockEng.EXPECT().GetInputType(core.BalanceEnforcement). - Return(core.AccountBalance, nil). + Return(core.BlockHeader, nil). Times(1) return ts @@ -315,7 +315,7 @@ func Test_BuildPipelineCfg(t *testing.T) { constructor: func(t *testing.T) *testSuite { ts := createTestSuite(t) ts.mockEng.EXPECT().GetInputType(core.BalanceEnforcement). - Return(core.AccountBalance, nil). + Return(core.BlockHeader, nil). Times(1) return ts From b6569a08506ff1acd3b53f38119170c375da3236 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Thu, 2 Nov 2023 15:20:59 -0700 Subject: [PATCH 2/5] [etl_optimisations] block_traversal -> header_traversal --- internal/etl/registry/block.go | 12 ++++++------ internal/etl/registry/registry.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/etl/registry/block.go b/internal/etl/registry/block.go index d696d530..7d430da1 100644 --- a/internal/etl/registry/block.go +++ b/internal/etl/registry/block.go @@ -19,7 +19,7 @@ const ( notFoundMsg = "not found" ) -type BlockTraversal struct { +type HeaderTraversal struct { n core.Network cUUID core.CUUID pUUID core.PUUID @@ -31,7 +31,7 @@ type BlockTraversal struct { stats metrics.Metricer } -func NewBlockTraversal(ctx context.Context, cfg *core.ClientConfig, +func NewHeaderTraversal(ctx context.Context, cfg *core.ClientConfig, opts ...component.Option) (component.Component, error) { clients, err := client.FromContext(ctx) if err != nil { @@ -56,7 +56,7 @@ func NewBlockTraversal(ctx context.Context, cfg *core.ClientConfig, // TODO - Support network confirmation counts ht := ix_node.NewHeaderTraversal(node, startHeader, big.NewInt(0)) - bt := &BlockTraversal{ + bt := &HeaderTraversal{ n: cfg.Network, client: node, traversal: ht, @@ -73,11 +73,11 @@ func NewBlockTraversal(ctx context.Context, cfg *core.ClientConfig, return reader, nil } -func (bt *BlockTraversal) Height() (*big.Int, error) { +func (bt *HeaderTraversal) Height() (*big.Int, error) { return bt.traversal.LastHeader().Number, nil } -func (bt *BlockTraversal) Backfill(start, end *big.Int, consumer chan core.TransitData) error { +func (bt *HeaderTraversal) Backfill(start, end *big.Int, consumer chan core.TransitData) error { for i := start; i.Cmp(end) < 0; i.Add(i, big.NewInt(batchSize)) { end := big.NewInt(0).Add(i, big.NewInt(batchSize)) @@ -99,7 +99,7 @@ func (bt *BlockTraversal) Backfill(start, end *big.Int, consumer chan core.Trans } // Loop ... -func (bt *BlockTraversal) Loop(ctx context.Context, consumer chan core.TransitData) error { +func (bt *HeaderTraversal) Loop(ctx context.Context, consumer chan core.TransitData) error { ticker := time.NewTicker(1 * time.Second) recent, err := bt.client.BlockHeaderByNumber(nil) diff --git a/internal/etl/registry/registry.go b/internal/etl/registry/registry.go index 100f8899..7f31a0b8 100644 --- a/internal/etl/registry/registry.go +++ b/internal/etl/registry/registry.go @@ -29,7 +29,7 @@ func NewRegistry() Registry { Addressing: false, DataType: core.BlockHeader, ComponentType: core.Reader, - Constructor: NewBlockTraversal, + Constructor: NewHeaderTraversal, Dependencies: noDeps(), Sk: noState(), From babd19a77eaeb86f293619b467b0be87632734b7 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Fri, 3 Nov 2023 01:33:38 -0700 Subject: [PATCH 3/5] [etl_optimisations] refactored testing --- e2e/alerting_test.go | 37 +++--- e2e/heuristic_test.go | 61 +++++++--- e2e/setup.go | 83 +++---------- internal/client/eth.go | 15 ++- internal/core/id_test.go | 4 +- internal/engine/engine.go | 5 +- .../engine/gomock_reflect_267190956/prog.go | 66 ++++++++++ internal/engine/manager_test.go | 96 --------------- internal/engine/registry/balance_enforce.go | 7 +- .../engine/registry/balance_enforce_test.go | 41 +++++-- internal/etl/pipeline/manager_test.go | 4 +- internal/etl/registry/event_test.go | 8 +- .../{block.go => header_traversal.go} | 42 +++---- internal/mocks/context.go | 7 ++ internal/mocks/eth_client.go | 115 +++++++++++++++++- 15 files changed, 350 insertions(+), 241 deletions(-) create mode 100644 internal/engine/gomock_reflect_267190956/prog.go delete mode 100644 internal/engine/manager_test.go rename internal/etl/registry/{block.go => header_traversal.go} (67%) diff --git a/e2e/alerting_test.go b/e2e/alerting_test.go index b66ec088..66a0acb7 100644 --- a/e2e/alerting_test.go +++ b/e2e/alerting_test.go @@ -91,15 +91,15 @@ func TestMultiDirectiveRouting(t *testing.T) { // balance enforcement heuristic session on L2 network with a cooldown. func TestCoolDown(t *testing.T) { - ts := e2e.CreateL2TestSuite(t) + ts := e2e.CreateSysTestSuite(t) defer ts.Close() - alice := ts.L2Cfg.Secrets.Addresses().Alice - bob := ts.L2Cfg.Secrets.Addresses().Bob + alice := ts.Cfg.Secrets.Addresses().Alice + bob := ts.Cfg.Secrets.Addresses().Bob alertMsg := "one baby to another says:" // Deploy a balance enforcement heuristic session for Alice using a cooldown. - _, err := ts.App.BootStrap([]*models.SessionRequestParams{{ + ids, err := ts.App.BootStrap([]*models.SessionRequestParams{{ Network: core.Layer2.String(), PType: core.Live.String(), HeuristicType: core.BalanceEnforcement.String(), @@ -120,21 +120,21 @@ func TestCoolDown(t *testing.T) { require.NoError(t, err, "Failed to bootstrap balance enforcement heuristic session") // Get Alice's balance. - aliceAmt, err := ts.L2Geth.L2Client.BalanceAt(context.Background(), alice, nil) + aliceAmt, err := ts.L2Client.BalanceAt(context.Background(), alice, nil) require.NoError(t, err, "Failed to get Alice's balance") // Determine the gas cost of the transaction. gasAmt := 1_000_001 bigAmt := big.NewInt(1_000_001) - gasPrice := big.NewInt(int64(ts.L2Cfg.DeployConfig.L2GenesisBlockGasLimit)) + gasPrice := big.NewInt(int64(ts.Cfg.DeployConfig.L2GenesisBlockGasLimit)) gasCost := gasPrice.Mul(gasPrice, bigAmt) - signer := types.LatestSigner(ts.L2Geth.L2ChainConfig) + signer := types.LatestSigner(ts.Sys.L2GenesisCfg.Config) // Create a transaction from Alice to Bob that will drain almost all of Alice's ETH. - drainAliceTx := types.MustSignNewTx(ts.L2Cfg.Secrets.Alice, signer, &types.DynamicFeeTx{ - ChainID: big.NewInt(int64(ts.L2Cfg.DeployConfig.L2ChainID)), + drainAliceTx := types.MustSignNewTx(ts.Cfg.Secrets.Alice, signer, &types.DynamicFeeTx{ + ChainID: big.NewInt(int64(ts.Cfg.DeployConfig.L2ChainID)), Nonce: 0, GasTipCap: big.NewInt(100), GasFeeCap: big.NewInt(100000), @@ -145,12 +145,21 @@ func TestCoolDown(t *testing.T) { Data: nil, }) - // Send the transaction to drain Alice's account of almost all ETH. - _, err = ts.L2Geth.AddL2Block(context.Background(), drainAliceTx) - require.NoError(t, err, "Failed to create L2 block with transaction") + err = ts.L2Client.SendTransaction(context.Background(), drainAliceTx) + require.NoError(t, err) + + receipt, err := wait.ForReceipt(context.Background(), ts.L2Client, drainAliceTx.Hash(), types.ReceiptStatusSuccessful) + require.NoError(t, err) + + require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) { + pUUID := ids[0].PUUID + height, err := ts.Subsystems.PipelineHeight(pUUID) + if err != nil { + return false, err + } - // Wait for Pessimism to process the balance change and send a notification to the mocked Slack server. - time.Sleep(2 * time.Second) + return height != nil && height.Uint64() > receipt.BlockNumber.Uint64(), nil + })) // Check that the balance enforcement was triggered using the mocked server cache. posts := ts.TestSlackSvr.SlackAlerts() diff --git a/e2e/heuristic_test.go b/e2e/heuristic_test.go index b78d6dad..d9b72c12 100644 --- a/e2e/heuristic_test.go +++ b/e2e/heuristic_test.go @@ -31,15 +31,15 @@ import ( // balance enforcement heuristic session on L2 network. func TestBalanceEnforcement(t *testing.T) { - ts := e2e.CreateL2TestSuite(t) + ts := e2e.CreateSysTestSuite(t) defer ts.Close() - alice := ts.L2Cfg.Secrets.Addresses().Alice - bob := ts.L2Cfg.Secrets.Addresses().Bob + alice := ts.Cfg.Secrets.Addresses().Alice + bob := ts.Cfg.Secrets.Addresses().Bob alertMsg := "one baby to another says:" // Deploy a balance enforcement heuristic session for Alice. - _, err := ts.App.BootStrap([]*models.SessionRequestParams{{ + ids, err := ts.App.BootStrap([]*models.SessionRequestParams{{ Network: core.Layer2.String(), PType: core.Live.String(), HeuristicType: core.BalanceEnforcement.String(), @@ -58,21 +58,21 @@ func TestBalanceEnforcement(t *testing.T) { require.NoError(t, err, "Failed to bootstrap balance enforcement heuristic session") // Get Alice's balance. - aliceAmt, err := ts.L2Geth.L2Client.BalanceAt(context.Background(), alice, nil) + aliceAmt, err := ts.L2Client.BalanceAt(context.Background(), alice, nil) require.NoError(t, err, "Failed to get Alice's balance") // Determine the gas cost of the transaction. gasAmt := 1_000_001 bigAmt := big.NewInt(1_000_001) - gasPrice := big.NewInt(int64(ts.L2Cfg.DeployConfig.L2GenesisBlockGasLimit)) + gasPrice := big.NewInt(int64(ts.Cfg.DeployConfig.L2GenesisBlockGasLimit)) gasCost := gasPrice.Mul(gasPrice, bigAmt) - signer := types.LatestSigner(ts.L2Geth.L2ChainConfig) + signer := types.LatestSigner(ts.Sys.L2GenesisCfg.Config) // Create a transaction from Alice to Bob that will drain almost all of Alice's ETH. - drainAliceTx := types.MustSignNewTx(ts.L2Cfg.Secrets.Alice, signer, &types.DynamicFeeTx{ - ChainID: big.NewInt(int64(ts.L2Cfg.DeployConfig.L2ChainID)), + drainAliceTx := types.MustSignNewTx(ts.Cfg.Secrets.Alice, signer, &types.DynamicFeeTx{ + ChainID: big.NewInt(int64(ts.Cfg.DeployConfig.L2ChainID)), Nonce: 0, GasTipCap: big.NewInt(100), GasFeeCap: big.NewInt(100000), @@ -86,11 +86,23 @@ func TestBalanceEnforcement(t *testing.T) { require.Equal(t, len(ts.TestPagerDutyServer.PagerDutyAlerts()), 0, "No alerts should be sent before the transaction is sent") // Send the transaction to drain Alice's account of almost all ETH. - _, err = ts.L2Geth.AddL2Block(context.Background(), drainAliceTx) + + err = ts.L2Client.SendTransaction(context.Background(), drainAliceTx) + require.NoError(t, err) + + receipt, err := wait.ForReceipt(context.Background(), ts.L2Client, drainAliceTx.Hash(), types.ReceiptStatusSuccessful) require.NoError(t, err, "Failed to create L2 block with transaction") // Wait for Pessimism to process the balance change and send a notification to the mocked Slack server. - time.Sleep(1 * time.Second) + require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) { + pUUID := ids[0].PUUID + height, err := ts.Subsystems.PipelineHeight(pUUID) + if err != nil { + return false, err + } + + return height != nil && height.Uint64() > receipt.BlockNumber.Uint64(), nil + })) // Check that the balance enforcement was triggered using the mocked server cache. pdMsgs := ts.TestPagerDutyServer.PagerDutyAlerts() @@ -100,12 +112,12 @@ func TestBalanceEnforcement(t *testing.T) { assert.Contains(t, pdMsgs[0].Payload.Summary, "balance_enforcement", "Balance enforcement alert was not sent") // Get Bobs's balance. - bobAmt, err := ts.L2Geth.L2Client.BalanceAt(context.Background(), bob, nil) + bobAmt, err := ts.L2Client.BalanceAt(context.Background(), bob, nil) require.NoError(t, err, "Failed to get Alice's balance") // Create a transaction to send the ETH back to Alice. - drainBobTx := types.MustSignNewTx(ts.L2Cfg.Secrets.Bob, signer, &types.DynamicFeeTx{ - ChainID: big.NewInt(int64(ts.L2Cfg.DeployConfig.L2ChainID)), + drainBobTx := types.MustSignNewTx(ts.Cfg.Secrets.Bob, signer, &types.DynamicFeeTx{ + ChainID: big.NewInt(int64(ts.Cfg.DeployConfig.L2ChainID)), Nonce: 0, GasTipCap: big.NewInt(100), GasFeeCap: big.NewInt(100000), @@ -116,11 +128,22 @@ func TestBalanceEnforcement(t *testing.T) { }) // Send the transaction to re-disperse the ETH from Bob back to Alice. - _, err = ts.L2Geth.AddL2Block(context.Background(), drainBobTx) - require.NoError(t, err, "Failed to create L2 block with transaction") + err = ts.L2Client.SendTransaction(context.Background(), drainBobTx) + require.NoError(t, err) - // Wait for Pessimism to process the balance change. - time.Sleep(1 * time.Second) + receipt, err = wait.ForReceipt(context.Background(), ts.L2Client, drainBobTx.Hash(), types.ReceiptStatusSuccessful) + require.NoError(t, err) + + // Wait for Pessimism to process the balance change and send a notification to the mocked Slack server. + require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) { + pUUID := ids[0].PUUID + height, err := ts.Subsystems.PipelineHeight(pUUID) + if err != nil { + return false, err + } + + return height != nil && height.Uint64() > receipt.BlockNumber.Uint64(), nil + })) // Empty the mocked PagerDuty server cache. ts.TestPagerDutyServer.ClearAlerts() @@ -129,7 +152,7 @@ func TestBalanceEnforcement(t *testing.T) { time.Sleep(1 * time.Second) // Ensure that no new alerts were sent. - assert.Equal(t, len(ts.TestPagerDutyServer.Payloads), 0, "No alerts should be sent after the transaction is sent") + assert.Equal(t, 0, len(ts.TestPagerDutyServer.Payloads)) } // TestContractEvent ... Tests the E2E flow of a single diff --git a/e2e/setup.go b/e2e/setup.go index 2499e7e3..5a224e86 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -17,9 +17,9 @@ import ( "github.com/base-org/pessimism/internal/metrics" "github.com/base-org/pessimism/internal/mocks" "github.com/base-org/pessimism/internal/state" - "github.com/golang/mock/gomock" - "github.com/base-org/pessimism/internal/subsystem" + ix_node "github.com/ethereum-optimism/optimism/indexer/node" + "github.com/golang/mock/gomock" op_e2e "github.com/ethereum-optimism/optimism/op-e2e" "github.com/ethereum/go-ethereum/ethclient" @@ -65,73 +65,6 @@ type L2TestSuite struct { TestPagerDutyServer *TestPagerDutyServer } -// CreateSysTestSuite ... Creates a new L2Geth test suite -func CreateL2TestSuite(t *testing.T) *L2TestSuite { - ctx := context.Background() - nodeCfg := op_e2e.DefaultSystemConfig(t) - logging.New(core.Development) - - node, err := op_e2e.NewOpGeth(t, ctx, &nodeCfg) - if err != nil { - t.Fatal(err) - } - - if len(os.Getenv("ENABLE_ROLLUP_LOGS")) == 0 { - t.Log("set env 'ENABLE_ROLLUP_LOGS' to show rollup logs") - for name, logger := range nodeCfg.Loggers { - t.Logf("discarding logs for %s", name) - logger.SetHandler(log.DiscardHandler()) - } - } - - ss := state.NewMemState() - - bundle := &client.Bundle{ - L1Client: node.L2Client, - L2Client: node.L2Client, - } - ctx = app.InitializeContext(ctx, ss, bundle) - - appCfg := DefaultTestConfig() - - slackServer := NewTestSlackServer("127.0.0.1", 0) - - pagerdutyServer := NewTestPagerDutyServer("127.0.0.1", 0) - - slackURL := fmt.Sprintf("http://127.0.0.1:%d", slackServer.Port) - pagerdutyURL := fmt.Sprintf("http://127.0.0.1:%d", pagerdutyServer.Port) - - appCfg.AlertConfig.PagerdutyAlertEventsURL = pagerdutyURL - appCfg.AlertConfig.RoutingParams = DefaultRoutingParams(core.StringFromEnv(slackURL)) - - pess, kill, err := app.NewPessimismApp(ctx, appCfg) - if err != nil { - t.Fatal(err) - } - - if err := pess.Start(); err != nil { - t.Fatal(err) - } - - go pess.ListenForShutdown(kill) - - return &L2TestSuite{ - t: t, - L2Geth: node, - L2Cfg: &nodeCfg, - App: pess, - Close: func() { - kill() - node.Close() - slackServer.Close() - pagerdutyServer.Close() - }, - AppCfg: appCfg, - TestSlackSvr: slackServer, - TestPagerDutyServer: pagerdutyServer, - } -} - // CreateSysTestSuite ... Creates a new SysTestSuite func CreateSysTestSuite(t *testing.T) *SysTestSuite { t.Log("Creating system test suite") @@ -165,7 +98,19 @@ func CreateSysTestSuite(t *testing.T) *SysTestSuite { ctrl := gomock.NewController(t) ixClient := mocks.NewMockIxClient(ctrl) + l2NodeClient, err := ix_node.DialEthClient(sys.EthInstances["sequencer"].HTTPEndpoint(), metrics.NoopMetrics) + if err != nil { + t.Fatal(err) + } + + l1NodeClient, err := ix_node.DialEthClient(sys.EthInstances["l1"].HTTPEndpoint(), metrics.NoopMetrics) + if err != nil { + t.Fatal(err) + } + bundle := &client.Bundle{ + L1Node: l1NodeClient, + L2Node: l2NodeClient, L1Client: sys.Clients["l1"], L2Client: sys.Clients["sequencer"], L2Geth: gethClient, diff --git a/internal/client/eth.go b/internal/client/eth.go index 516e35ba..3d7760b1 100644 --- a/internal/client/eth.go +++ b/internal/client/eth.go @@ -1,4 +1,4 @@ -//go:generate mockgen -package mocks --destination ../mocks/eth_client.go . EthClient +//go:generate mockgen -package mocks --destination ../mocks/eth_client.go . EthClient,NodeClient package client @@ -27,12 +27,23 @@ type EthClient interface { ch chan<- types.Log) (ethereum.Subscription, error) } +type NodeClient interface { + BlockHeaderByNumber(*big.Int) (*types.Header, error) + BlockHeaderByHash(common.Hash) (*types.Header, error) + BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error) + + TxByHash(common.Hash) (*types.Transaction, error) + + StorageHash(common.Address, *big.Int) (common.Hash, error) + FilterLogs(ethereum.FilterQuery) ([]types.Log, error) +} + // NewEthClient ... Initializer func NewEthClient(ctx context.Context, rawURL string) (EthClient, error) { return ethclient.DialContext(ctx, rawURL) } -func NewNodeClient(ctx context.Context, rpcURL string) (ix_node.EthClient, error) { +func NewNodeClient(ctx context.Context, rpcURL string) (NodeClient, error) { stats := metrics.WithContext(ctx) return ix_node.DialEthClient(rpcURL, stats) diff --git a/internal/core/id_test.go b/internal/core/id_test.go index c9b32bde..253bd0d5 100644 --- a/internal/core/id_test.go +++ b/internal/core/id_test.go @@ -14,7 +14,7 @@ func Test_Component_ID(t *testing.T) { assert.Equal(t, expectedPID, actualID.PID) - expectedStr := "layer1:backtest:reader:account_balance" + expectedStr := "layer1:backtest:reader:block_header" actualStr := actualID.PID.String() assert.Equal(t, expectedStr, actualStr) @@ -28,7 +28,7 @@ func Test_Pipeline_ID(t *testing.T) { assert.Equal(t, expectedID, actualID.PID) - expectedStr := "backtest::layer1:backtest:reader:account_balance::layer1:backtest:reader:account_balance" + expectedStr := "backtest::layer1:backtest:reader:block_header::layer1:backtest:reader:block_header" actualStr := actualID.PID.String() assert.Equal(t, expectedStr, actualStr) diff --git a/internal/engine/engine.go b/internal/engine/engine.go index ede6a1fa..1190ec43 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -96,7 +96,7 @@ func (hce *hardCodedEngine) EventLoop(ctx context.Context) { case execInput := <-hce.heuristicIn: // Heuristic input received logger.Debug("Heuristic input received", zap.String(logging.SUUIDKey, execInput.h.SUUID().String())) - // (1) Execute heuristic with retry strategy + start := time.Now() var actSet *heuristic.ActivationSet @@ -105,7 +105,6 @@ func (hce *hardCodedEngine) EventLoop(ctx context.Context) { if _, err := retry.Do[any](ctx, 10, retryStrategy, func() (any, error) { actSet = hce.Execute(ctx, execInput.hi.Input, execInput.h) metrics.WithContext(ctx).RecordHeuristicRun(execInput.h) - metrics.WithContext(ctx).RecordInvExecutionTime(execInput.h, float64(time.Since(start).Nanoseconds())) // a-ok! return 0, nil }); err != nil { @@ -113,7 +112,7 @@ func (hce *hardCodedEngine) EventLoop(ctx context.Context) { metrics.WithContext(ctx).RecordAssessmentError(execInput.h) } - // (2) Send alerts for respective activations + metrics.WithContext(ctx).RecordInvExecutionTime(execInput.h, float64(time.Since(start).Nanoseconds())) if actSet.Activated() { for _, act := range actSet.Entries() { alert := core.Alert{ diff --git a/internal/engine/gomock_reflect_267190956/prog.go b/internal/engine/gomock_reflect_267190956/prog.go new file mode 100644 index 00000000..624ed59f --- /dev/null +++ b/internal/engine/gomock_reflect_267190956/prog.go @@ -0,0 +1,66 @@ + +package main + +import ( + "encoding/gob" + "flag" + "fmt" + "os" + "path" + "reflect" + + "github.com/golang/mock/mockgen/model" + + pkg_ "github.com/base-org/pessimism/internal/engine" +) + +var output = flag.String("output", "", "The output file name, or empty to use stdout.") + +func main() { + flag.Parse() + + its := []struct{ + sym string + typ reflect.Type + }{ + + { "Manager", reflect.TypeOf((*pkg_.Manager)(nil)).Elem()}, + + } + pkg := &model.Package{ + // NOTE: This behaves contrary to documented behaviour if the + // package name is not the final component of the import path. + // The reflect package doesn't expose the package name, though. + Name: path.Base("github.com/base-org/pessimism/internal/engine"), + } + + for _, it := range its { + intf, err := model.InterfaceFromInterfaceType(it.typ) + if err != nil { + fmt.Fprintf(os.Stderr, "Reflection: %v\n", err) + os.Exit(1) + } + intf.Name = it.sym + pkg.Interfaces = append(pkg.Interfaces, intf) + } + + outfile := os.Stdout + if len(*output) != 0 { + var err error + outfile, err = os.Create(*output) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to open output file %q", *output) + } + defer func() { + if err := outfile.Close(); err != nil { + fmt.Fprintf(os.Stderr, "failed to close output file %q", *output) + os.Exit(1) + } + }() + } + + if err := gob.NewEncoder(outfile).Encode(pkg); err != nil { + fmt.Fprintf(os.Stderr, "gob encode: %v\n", err) + os.Exit(1) + } +} diff --git a/internal/engine/manager_test.go b/internal/engine/manager_test.go deleted file mode 100644 index 3d017c51..00000000 --- a/internal/engine/manager_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package engine_test - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/base-org/pessimism/internal/core" - "github.com/base-org/pessimism/internal/engine" - "github.com/base-org/pessimism/internal/engine/heuristic" - "github.com/base-org/pessimism/internal/engine/registry" - "github.com/base-org/pessimism/internal/state" - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" -) - -func Test_EventLoop(t *testing.T) { - // Setup test dependencies - alertChan := make(chan core.Alert) - testPUUID := core.NilPUUID() - - ctx := context.Background() - ss := state.NewMemState() - - ctx = context.WithValue(ctx, core.State, ss) - - em := engine.NewManager(ctx, - &engine.Config{WorkerCount: 1}, - engine.NewHardCodedEngine(alertChan), - engine.NewAddressingMap(), - engine.NewSessionStore(), - registry.NewHeuristicTable(), - alertChan, - ) - - ingress := em.Transit() - - // Spinup event loop routine w/ closure - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - _ = em.EventLoop() - }() - - defer func() { - _ = em.Shutdown() - wg.Wait() - }() - - isp := core.NewSessionParams() - isp.SetValue("address", common.HexToAddress("0x69").String()) - isp.SetValue("upper", 420) - - // Deploy heuristic session - deployCfg := &heuristic.DeployConfig{ - HeuristicType: core.BalanceEnforcement, - Network: core.Layer1, - Stateful: true, - StateKey: &core.StateKey{}, - AlertingPolicy: &core.AlertPolicy{ - Dest: core.Slack.String(), - }, - Params: isp, - PUUID: testPUUID, - } - - suuid, err := em.DeployHeuristicSession(deployCfg) - assert.NoError(t, err) - assert.NotNil(t, suuid) - - // Construct heuristic input - hi := core.HeuristicInput{ - PUUID: testPUUID, - Input: core.TransitData{ - Type: core.BlockHeader, - Address: common.HexToAddress("0x69"), - Value: float64(666), - }, - } - - // Send heuristic input to event loop - ingress <- hi - ticker := time.NewTicker(1 * time.Second) - - // Receive alert from event loop - select { - case <-ticker.C: - assert.FailNow(t, "Timed out waiting for alert data") - - case alert := <-alertChan: - assert.NotNil(t, alert) - assert.Equal(t, alert.PUUID, testPUUID) - } -} diff --git a/internal/engine/registry/balance_enforce.go b/internal/engine/registry/balance_enforce.go index fd34a783..a03d60fc 100644 --- a/internal/engine/registry/balance_enforce.go +++ b/internal/engine/registry/balance_enforce.go @@ -6,10 +6,13 @@ import ( "fmt" "time" + "github.com/base-org/pessimism/internal/common/math" + "github.com/base-org/pessimism/internal/client" "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/engine/heuristic" "github.com/base-org/pessimism/internal/logging" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "go.uber.org/zap" ) @@ -71,12 +74,12 @@ func (bi *BalanceHeuristic) Assess(td core.TransitData) (*heuristic.ActivationSe } // See if a tx changed the balance for the address - balance, err := client.BalanceAt(context.Background(), td.Address, header.Number) + balance, err := client.BalanceAt(bi.ctx, common.HexToAddress(bi.cfg.Address), header.Number) if err != nil { return nil, err } - ethBalance := float64(balance.Int64()) / 1000000000000000000 + ethBalance, _ := math.WeiToEther(balance).Float64() activated := false diff --git a/internal/engine/registry/balance_enforce_test.go b/internal/engine/registry/balance_enforce_test.go index 4ba0b2b0..54294b5d 100644 --- a/internal/engine/registry/balance_enforce_test.go +++ b/internal/engine/registry/balance_enforce_test.go @@ -2,10 +2,16 @@ package registry_test import ( "context" + "math/big" "testing" "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/engine/registry" + "github.com/base-org/pessimism/internal/mocks" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) @@ -13,7 +19,9 @@ func Test_Balance_Assess(t *testing.T) { upper := float64(5) lower := float64(1) - bi, err := registry.NewBalanceHeuristic(context.Background(), + ctx, ms := mocks.Context(context.Background(), gomock.NewController(t)) + + bi, err := registry.NewBalanceHeuristic(ctx, ®istry.BalanceInvConfig{ Address: "0x123", UpperBound: &upper, @@ -22,31 +30,50 @@ func Test_Balance_Assess(t *testing.T) { assert.NoError(t, err) + num := big.NewInt(1) // No activation testData1 := core.TransitData{ - Type: core.BlockHeader, - Value: float64(3), + Network: core.Layer1, + Type: core.BlockHeader, + Value: types.Header{ + Number: num, + }, } + ms.MockL1.EXPECT(). + BalanceAt(ctx, common.HexToAddress("0x123"), num).Return(big.NewInt(3000000000000000000), nil).Times(1) as, err := bi.Assess(testData1) assert.NoError(t, err) assert.False(t, as.Activated()) // Upper bound activation + num = num.Add(num, big.NewInt(1)) testData2 := core.TransitData{ - Type: core.BlockHeader, - Value: float64(6), + Network: core.Layer1, + Type: core.BlockHeader, + Value: types.Header{ + Number: num, + }, } + ms.MockL1.EXPECT(). + BalanceAt(ctx, common.HexToAddress("0x123"), num).Return(big.NewInt(6000000000000000000), nil).Times(1) + as, err = bi.Assess(testData2) assert.NoError(t, err) assert.True(t, as.Activated()) + num = num.Add(num, big.NewInt(1)) // Lower bound activation testData3 := core.TransitData{ - Type: core.BlockHeader, - Value: float64(0.1), + Network: core.Layer1, + Type: core.BlockHeader, + Value: types.Header{ + Number: num, + }, } + ms.MockL1.EXPECT(). + BalanceAt(ctx, common.HexToAddress("0x123"), num).Return(big.NewInt(600000000000000000), nil).Times(1) as, err = bi.Assess(testData3) assert.NoError(t, err) diff --git a/internal/etl/pipeline/manager_test.go b/internal/etl/pipeline/manager_test.go index 33ee6411..6901610d 100644 --- a/internal/etl/pipeline/manager_test.go +++ b/internal/etl/pipeline/manager_test.go @@ -67,7 +67,9 @@ func Test_Manager(t *testing.T) { reg := registry.NewRegistry() ctrl := gomock.NewController(t) - ctx, _ := mocks.Context(context.Background(), ctrl) + ctx, ms := mocks.Context(context.Background(), ctrl) + + ms.MockL1Node.EXPECT().BlockHeaderByNumber(gomock.Any()).Return(nil, fmt.Errorf("keep going")).AnyTimes() ctx = context.WithValue(ctx, core.State, state.NewMemState()) diff --git a/internal/etl/registry/event_test.go b/internal/etl/registry/event_test.go index ccb59978..7833dddc 100644 --- a/internal/etl/registry/event_test.go +++ b/internal/etl/registry/event_test.go @@ -71,7 +71,7 @@ func TestEventLogPipe(t *testing.T) { suite.mockSuite.MockL1.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("unknown block")) _, err := suite.def.Transform(suite.ctx, core.TransitData{ - Value: *types.NewBlockWithHeader(&types.Header{})}) + Value: types.Header{}}) assert.Error(t, err) }, }, @@ -86,7 +86,7 @@ func TestEventLogPipe(t *testing.T) { suite.mockSuite.MockL1.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return(nil, nil) tds, err := suite.def.Transform(suite.ctx, core.TransitData{ - Value: *types.NewBlockWithHeader(&types.Header{}), + Value: types.Header{}, }) assert.NoError(t, err) assert.Empty(t, tds) @@ -106,7 +106,7 @@ func TestEventLogPipe(t *testing.T) { Return(nil, fmt.Errorf("unknown block")) tds, err := suite.def.Transform(suite.ctx, core.TransitData{ - Value: *types.NewBlockWithHeader(&types.Header{}), + Value: types.Header{}, }) assert.Error(t, err) assert.Empty(t, tds) @@ -129,7 +129,7 @@ func TestEventLogPipe(t *testing.T) { Return([]types.Log{log2}, nil) tds, err = suite.def.Transform(suite.ctx, core.TransitData{ - Value: *types.NewBlockWithHeader(&types.Header{}), + Value: types.Header{}, }) assert.NoError(t, err) diff --git a/internal/etl/registry/block.go b/internal/etl/registry/header_traversal.go similarity index 67% rename from internal/etl/registry/block.go rename to internal/etl/registry/header_traversal.go index 7d430da1..3f1321c1 100644 --- a/internal/etl/registry/block.go +++ b/internal/etl/registry/header_traversal.go @@ -8,9 +8,11 @@ import ( "github.com/base-org/pessimism/internal/client" "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/etl/component" + "github.com/base-org/pessimism/internal/logging" "github.com/base-org/pessimism/internal/metrics" ix_node "github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum/go-ethereum/core/types" + "go.uber.org/zap" ) const ( @@ -54,34 +56,33 @@ func NewHeaderTraversal(ctx context.Context, cfg *core.ClientConfig, } // TODO - Support network confirmation counts - ht := ix_node.NewHeaderTraversal(node, startHeader, big.NewInt(0)) - bt := &HeaderTraversal{ + ht := &HeaderTraversal{ n: cfg.Network, client: node, - traversal: ht, + traversal: ix_node.NewHeaderTraversal(node, startHeader, big.NewInt(0)), pollInterval: time.Duration(cfg.PollInterval) * time.Millisecond, } - reader, err := component.NewReader(ctx, core.BlockHeader, bt, opts...) + reader, err := component.NewReader(ctx, core.BlockHeader, ht, opts...) if err != nil { return nil, err } - bt.cUUID = reader.UUID() - bt.pUUID = reader.PUUID() + ht.cUUID = reader.UUID() + ht.pUUID = reader.PUUID() return reader, nil } -func (bt *HeaderTraversal) Height() (*big.Int, error) { - return bt.traversal.LastHeader().Number, nil +func (ht *HeaderTraversal) Height() (*big.Int, error) { + return ht.traversal.LastHeader().Number, nil } -func (bt *HeaderTraversal) Backfill(start, end *big.Int, consumer chan core.TransitData) error { +func (ht *HeaderTraversal) Backfill(start, end *big.Int, consumer chan core.TransitData) error { for i := start; i.Cmp(end) < 0; i.Add(i, big.NewInt(batchSize)) { end := big.NewInt(0).Add(i, big.NewInt(batchSize)) - headers, err := bt.client.BlockHeadersByRange(i, end) + headers, err := ht.client.BlockHeadersByRange(i, end) if err != nil { return err } @@ -99,40 +100,40 @@ func (bt *HeaderTraversal) Backfill(start, end *big.Int, consumer chan core.Tran } // Loop ... -func (bt *HeaderTraversal) Loop(ctx context.Context, consumer chan core.TransitData) error { +func (ht *HeaderTraversal) Loop(ctx context.Context, consumer chan core.TransitData) error { ticker := time.NewTicker(1 * time.Second) - recent, err := bt.client.BlockHeaderByNumber(nil) + recent, err := ht.client.BlockHeaderByNumber(nil) if err != nil { - return err + logging.NoContext().Error("Failed to get latest header", zap.Error(err)) } // backfill if provided starting header - if bt.traversal.LastHeader() != nil { + if ht.traversal.LastHeader() != nil { - bt.Backfill(bt.traversal.LastHeader().Number, recent.Number, consumer) + ht.Backfill(ht.traversal.LastHeader().Number, recent.Number, consumer) } else { - bt.traversal = ix_node.NewHeaderTraversal(bt.client, recent, big.NewInt(0)) + ht.traversal = ix_node.NewHeaderTraversal(ht.client, recent, big.NewInt(0)) } for { select { case <-ticker.C: - header, err := bt.client.BlockHeaderByNumber(nil) + header, err := ht.client.BlockHeaderByNumber(nil) if err != nil { return err } - if header.Number.Cmp(bt.traversal.LastHeader().Number) > 0 { - headers, err := bt.traversal.NextFinalizedHeaders(batchSize) + if header.Number.Cmp(ht.traversal.LastHeader().Number) > 0 { + headers, err := ht.traversal.NextFinalizedHeaders(batchSize) if err != nil { return err } for _, header := range headers { consumer <- core.TransitData{ - Network: bt.n, + Network: ht.n, Timestamp: time.Now(), Type: core.BlockHeader, Value: header, @@ -144,5 +145,4 @@ func (bt *HeaderTraversal) Loop(ctx context.Context, consumer chan core.TransitD return nil } } - } diff --git a/internal/mocks/context.go b/internal/mocks/context.go index c49b3fb0..c565ff70 100644 --- a/internal/mocks/context.go +++ b/internal/mocks/context.go @@ -13,8 +13,10 @@ type MockSuite struct { Ctrl *gomock.Controller Bundle *client.Bundle MockIndexer *MockIxClient + MockL1Node *MockNodeClient MockL1 *MockEthClient MockL2 *MockEthClient + MockL2Node *MockNodeClient SS state.Store } @@ -23,13 +25,16 @@ func Context(ctx context.Context, ctrl *gomock.Controller) (context.Context, *Mo // 1. Construct mocked bundle mockedClient := NewMockEthClient(ctrl) mockedIndexer := NewMockIxClient(ctrl) + mockedNode := NewMockNodeClient(ctrl) ss := state.NewMemState() bundle := &client.Bundle{ IxClient: mockedIndexer, L1Client: mockedClient, + L1Node: mockedNode, L2Client: mockedClient, + L2Node: mockedNode, } // 2. Bind to context @@ -42,7 +47,9 @@ func Context(ctx context.Context, ctrl *gomock.Controller) (context.Context, *Mo Bundle: bundle, MockIndexer: mockedIndexer, MockL1: mockedClient, + MockL1Node: mockedNode, MockL2: mockedClient, + MockL2Node: mockedNode, SS: ss, } diff --git a/internal/mocks/eth_client.go b/internal/mocks/eth_client.go index 07210a0b..f33e2061 100644 --- a/internal/mocks/eth_client.go +++ b/internal/mocks/eth_client.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/base-org/pessimism/internal/client (interfaces: EthClient) +// Source: github.com/base-org/pessimism/internal/client (interfaces: EthClient,NodeClient) // Package mocks is a generated GoMock package. package mocks @@ -142,3 +142,116 @@ func (mr *MockEthClientMockRecorder) SubscribeFilterLogs(arg0, arg1, arg2 interf mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeFilterLogs", reflect.TypeOf((*MockEthClient)(nil).SubscribeFilterLogs), arg0, arg1, arg2) } + +// MockNodeClient is a mock of NodeClient interface. +type MockNodeClient struct { + ctrl *gomock.Controller + recorder *MockNodeClientMockRecorder +} + +// MockNodeClientMockRecorder is the mock recorder for MockNodeClient. +type MockNodeClientMockRecorder struct { + mock *MockNodeClient +} + +// NewMockNodeClient creates a new mock instance. +func NewMockNodeClient(ctrl *gomock.Controller) *MockNodeClient { + mock := &MockNodeClient{ctrl: ctrl} + mock.recorder = &MockNodeClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockNodeClient) EXPECT() *MockNodeClientMockRecorder { + return m.recorder +} + +// BlockHeaderByHash mocks base method. +func (m *MockNodeClient) BlockHeaderByHash(arg0 common.Hash) (*types.Header, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BlockHeaderByHash", arg0) + ret0, _ := ret[0].(*types.Header) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BlockHeaderByHash indicates an expected call of BlockHeaderByHash. +func (mr *MockNodeClientMockRecorder) BlockHeaderByHash(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockHeaderByHash", reflect.TypeOf((*MockNodeClient)(nil).BlockHeaderByHash), arg0) +} + +// BlockHeaderByNumber mocks base method. +func (m *MockNodeClient) BlockHeaderByNumber(arg0 *big.Int) (*types.Header, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BlockHeaderByNumber", arg0) + ret0, _ := ret[0].(*types.Header) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BlockHeaderByNumber indicates an expected call of BlockHeaderByNumber. +func (mr *MockNodeClientMockRecorder) BlockHeaderByNumber(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockHeaderByNumber", reflect.TypeOf((*MockNodeClient)(nil).BlockHeaderByNumber), arg0) +} + +// BlockHeadersByRange mocks base method. +func (m *MockNodeClient) BlockHeadersByRange(arg0, arg1 *big.Int) ([]types.Header, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BlockHeadersByRange", arg0, arg1) + ret0, _ := ret[0].([]types.Header) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BlockHeadersByRange indicates an expected call of BlockHeadersByRange. +func (mr *MockNodeClientMockRecorder) BlockHeadersByRange(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockHeadersByRange", reflect.TypeOf((*MockNodeClient)(nil).BlockHeadersByRange), arg0, arg1) +} + +// FilterLogs mocks base method. +func (m *MockNodeClient) FilterLogs(arg0 ethereum.FilterQuery) ([]types.Log, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FilterLogs", arg0) + ret0, _ := ret[0].([]types.Log) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FilterLogs indicates an expected call of FilterLogs. +func (mr *MockNodeClientMockRecorder) FilterLogs(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterLogs", reflect.TypeOf((*MockNodeClient)(nil).FilterLogs), arg0) +} + +// StorageHash mocks base method. +func (m *MockNodeClient) StorageHash(arg0 common.Address, arg1 *big.Int) (common.Hash, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StorageHash", arg0, arg1) + ret0, _ := ret[0].(common.Hash) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StorageHash indicates an expected call of StorageHash. +func (mr *MockNodeClientMockRecorder) StorageHash(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageHash", reflect.TypeOf((*MockNodeClient)(nil).StorageHash), arg0, arg1) +} + +// TxByHash mocks base method. +func (m *MockNodeClient) TxByHash(arg0 common.Hash) (*types.Transaction, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TxByHash", arg0) + ret0, _ := ret[0].(*types.Transaction) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TxByHash indicates an expected call of TxByHash. +func (mr *MockNodeClientMockRecorder) TxByHash(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TxByHash", reflect.TypeOf((*MockNodeClient)(nil).TxByHash), arg0) +} From 00233e2acc43f48282067980217cec56a919285e Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Fri, 3 Nov 2023 01:34:15 -0700 Subject: [PATCH 4/5] [etl_optimisations] rm unnecessary files --- .../engine/gomock_reflect_267190956/prog.go | 66 ------------------- 1 file changed, 66 deletions(-) delete mode 100644 internal/engine/gomock_reflect_267190956/prog.go diff --git a/internal/engine/gomock_reflect_267190956/prog.go b/internal/engine/gomock_reflect_267190956/prog.go deleted file mode 100644 index 624ed59f..00000000 --- a/internal/engine/gomock_reflect_267190956/prog.go +++ /dev/null @@ -1,66 +0,0 @@ - -package main - -import ( - "encoding/gob" - "flag" - "fmt" - "os" - "path" - "reflect" - - "github.com/golang/mock/mockgen/model" - - pkg_ "github.com/base-org/pessimism/internal/engine" -) - -var output = flag.String("output", "", "The output file name, or empty to use stdout.") - -func main() { - flag.Parse() - - its := []struct{ - sym string - typ reflect.Type - }{ - - { "Manager", reflect.TypeOf((*pkg_.Manager)(nil)).Elem()}, - - } - pkg := &model.Package{ - // NOTE: This behaves contrary to documented behaviour if the - // package name is not the final component of the import path. - // The reflect package doesn't expose the package name, though. - Name: path.Base("github.com/base-org/pessimism/internal/engine"), - } - - for _, it := range its { - intf, err := model.InterfaceFromInterfaceType(it.typ) - if err != nil { - fmt.Fprintf(os.Stderr, "Reflection: %v\n", err) - os.Exit(1) - } - intf.Name = it.sym - pkg.Interfaces = append(pkg.Interfaces, intf) - } - - outfile := os.Stdout - if len(*output) != 0 { - var err error - outfile, err = os.Create(*output) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to open output file %q", *output) - } - defer func() { - if err := outfile.Close(); err != nil { - fmt.Fprintf(os.Stderr, "failed to close output file %q", *output) - os.Exit(1) - } - }() - } - - if err := gob.NewEncoder(outfile).Encode(pkg); err != nil { - fmt.Fprintf(os.Stderr, "gob encode: %v\n", err) - os.Exit(1) - } -} From bf3edcd09d1491e556e310d0624e83a084911565 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Fri, 3 Nov 2023 01:43:30 -0700 Subject: [PATCH 5/5] [etl_optimisations] Fix lint errors --- e2e/setup.go | 15 --------------- internal/client/client.go | 1 - internal/core/etl.go | 1 - internal/core/register.go | 1 - internal/engine/registry/balance_enforce.go | 6 ++---- internal/etl/pipeline/types.go | 2 -- internal/etl/registry/header_traversal.go | 11 ++++++----- internal/metrics/metrics.go | 4 ++-- 8 files changed, 10 insertions(+), 31 deletions(-) diff --git a/e2e/setup.go b/e2e/setup.go index 5a224e86..4bd8d5b3 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -50,21 +50,6 @@ type SysTestSuite struct { L2Client *ethclient.Client } -// L2TestSuite ... Stores all the information needed to run an e2e L2Geth test -type L2TestSuite struct { - t *testing.T - - L2Geth *op_e2e.OpGeth - L2Cfg *op_e2e.SystemConfig - - App *app.Application - AppCfg *config.Config - Close func() - - TestSlackSvr *TestSlackServer - TestPagerDutyServer *TestPagerDutyServer -} - // CreateSysTestSuite ... Creates a new SysTestSuite func CreateSysTestSuite(t *testing.T) *SysTestSuite { t.Log("Creating system test suite") diff --git a/internal/client/client.go b/internal/client/client.go index e118ecbb..dc843f41 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -100,7 +100,6 @@ func (b *Bundle) NodeClient(n core.Network) (ix_node.EthClient, error) { default: return nil, fmt.Errorf("invalid network supplied") } - } // FromNetwork ... Retrieves an eth client from the context diff --git a/internal/core/etl.go b/internal/core/etl.go index 32b05c02..22f1194f 100644 --- a/internal/core/etl.go +++ b/internal/core/etl.go @@ -16,7 +16,6 @@ func (ct ComponentType) String() string { case Transformer: return "transformer" - } return UnknownType diff --git a/internal/core/register.go b/internal/core/register.go index ca60b436..96232589 100644 --- a/internal/core/register.go +++ b/internal/core/register.go @@ -12,7 +12,6 @@ const ( // register enum func (rt RegisterType) String() string { switch rt { - case BlockHeader: return "block_header" diff --git a/internal/engine/registry/balance_enforce.go b/internal/engine/registry/balance_enforce.go index a03d60fc..1f748cac 100644 --- a/internal/engine/registry/balance_enforce.go +++ b/internal/engine/registry/balance_enforce.go @@ -31,9 +31,8 @@ func (bi *BalanceInvConfig) Unmarshal(isp *core.SessionParams) error { // BalanceHeuristic ... type BalanceHeuristic struct { - ctx context.Context - cfg *BalanceInvConfig - client client.EthClient + ctx context.Context + cfg *BalanceInvConfig heuristic.Heuristic } @@ -50,7 +49,6 @@ const reportMsg = ` // NewBalanceHeuristic ... Initializer func NewBalanceHeuristic(ctx context.Context, cfg *BalanceInvConfig) (heuristic.Heuristic, error) { - return &BalanceHeuristic{ ctx: ctx, cfg: cfg, diff --git a/internal/etl/pipeline/types.go b/internal/etl/pipeline/types.go index 9c4abfb1..47f4ca33 100644 --- a/internal/etl/pipeline/types.go +++ b/internal/etl/pipeline/types.go @@ -41,6 +41,4 @@ const ( emptyPipelineError = "pipeline must contain at least one component" // Manager error constants unknownCompType = "unknown component type %s provided" - - noAggregatorErr = "aggregator component has yet to be implemented" ) diff --git a/internal/etl/registry/header_traversal.go b/internal/etl/registry/header_traversal.go index 3f1321c1..757242d2 100644 --- a/internal/etl/registry/header_traversal.go +++ b/internal/etl/registry/header_traversal.go @@ -16,9 +16,8 @@ import ( ) const ( + // This could be configurable in the future batchSize = 100 - - notFoundMsg = "not found" ) type HeaderTraversal struct { @@ -61,7 +60,7 @@ func NewHeaderTraversal(ctx context.Context, cfg *core.ClientConfig, n: cfg.Network, client: node, traversal: ix_node.NewHeaderTraversal(node, startHeader, big.NewInt(0)), - pollInterval: time.Duration(cfg.PollInterval) * time.Millisecond, + pollInterval: cfg.PollInterval * time.Millisecond, } reader, err := component.NewReader(ctx, core.BlockHeader, ht, opts...) @@ -110,8 +109,10 @@ func (ht *HeaderTraversal) Loop(ctx context.Context, consumer chan core.TransitD // backfill if provided starting header if ht.traversal.LastHeader() != nil { - - ht.Backfill(ht.traversal.LastHeader().Number, recent.Number, consumer) + err = ht.Backfill(ht.traversal.LastHeader().Number, recent.Number, consumer) + if err != nil { + return err + } } else { ht.traversal = ix_node.NewHeaderTraversal(ht.client, recent, big.NewInt(0)) } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 81403697..75f57b33 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -325,10 +325,10 @@ func (n *noopMetricer) RecordNodeError(_ core.Network) func (n *noopMetricer) RecordBlockLatency(_ core.Network, _ float64) {} func (n *noopMetricer) RecordPipelineLatency(_ core.PUUID, _ float64) {} func (n *noopMetricer) RecordAssessmentError(_ heuristic.Heuristic) {} -func (n *noopMetricer) RecordRPCClientRequest(method string) func(err error) { +func (n *noopMetricer) RecordRPCClientRequest(_ string) func(err error) { return func(err error) {} } -func (n *noopMetricer) RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error) { +func (n *noopMetricer) RecordRPCClientBatchRequest(_ []rpc.BatchElem) func(err error) { return func(err error) {} }