diff --git a/README.md b/README.md index 8fddda0..9bdb3e2 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,16 @@ Running the relayer noble-cctp-relayer start --config ./config/sample-app-config.yaml ``` Sample configs can be found in [config](config). -### Promethius Metrics + +### Flush Interval + +Using the `--flush-interval` flag will run a flush on all paths every `duration`; ex `--flush-interval 5m` + +The relayer will keep track of the latest flushed block. The first time the flush is run, the flush will start at the chains latest height - lookback period and flush up until height of the chain when the flush started. It will then store the height the flush ended on. + +After that, it will flush from the last stored height - lookback period up until the latest height of the chain. + +### Prometheus Metrics By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag. diff --git a/cmd/appstate.go b/cmd/appstate.go index 732c6d3..dc1f08f 100644 --- a/cmd/appstate.go +++ b/cmd/appstate.go @@ -63,10 +63,10 @@ func (a *AppState) loadConfigFile() { } config, err := ParseConfig(a.ConfigPath) if err != nil { - a.Logger.Error("unable to parse config file", "location", a.ConfigPath, "err", err) + a.Logger.Error("Unable to parse config file", "location", a.ConfigPath, "err", err) os.Exit(1) } - a.Logger.Info("successfully parsed config file", "location", a.ConfigPath) + a.Logger.Info("Successfully parsed config file", "location", a.ConfigPath) a.Config = config } diff --git a/cmd/process.go b/cmd/process.go index a805945..4aa8bc8 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -78,8 +78,7 @@ func Start(a *AppState) *cobra.Command { os.Exit(1) } - updateLatestHeight := 1 * time.Second - go c.TrackLatestBlockHeight(cmd.Context(), logger, updateLatestHeight) + go c.TrackLatestBlockHeight(cmd.Context(), logger) // wait until height is available for c.LatestBlock() == 0 { diff --git a/ethereum/listener.go b/ethereum/listener.go index 3d784e4..fdbb6dc 100644 --- a/ethereum/listener.go +++ b/ethereum/listener.go @@ -30,9 +30,6 @@ var ( // errSignal allows broadcasting an error value to multiple receivers. type errSignal struct { Ready chan struct{} - - Err error - Next *errSignal } func (e *Ethereum) StartListener( @@ -48,12 +45,12 @@ func (e *Ethereum) StartListener( messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json") if err != nil { - logger.Error("unable to read MessageTransmitter abi", "err", err) + logger.Error("Unable to read MessageTransmitter abi", "err", err) os.Exit(1) } messageTransmitterABI, err = abi.JSON(bytes.NewReader(messageTransmitter)) if err != nil { - logger.Error("unable to parse MessageTransmitter abi", "err", err) + logger.Error("Unable to parse MessageTransmitter abi", "err", err) os.Exit(1) } @@ -82,17 +79,17 @@ func (e *Ethereum) startListenerRoutines( go e.consumeStream(ctx, logger, stream, sig) consumeHistory(logger, history) - // get history from start-lookback up until latest block + // get history from (start block - lookback) up until latest block latestBlock := e.LatestBlock() start := latestBlock if e.startBlock != 0 { start = e.startBlock } startLookback := start - e.lookbackPeriod - logger.Info(fmt.Sprintf("getting history from %d: starting at:%d and looking back %d blocks", startLookback, start, e.lookbackPeriod)) + logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod)) e.getAndConsumeHistory(ctx, logger, startLookback, latestBlock) - logger.Info("finished getting history") + logger.Info("Finished getting history") if flushInterval > 0 { go e.flushMechanism(ctx, logger, sig) @@ -105,7 +102,7 @@ func (e *Ethereum) startListenerRoutines( case <-ctx.Done(): return case err := <-sub.Err(): - logger.Error("websocket disconnected. Reconnecting...", "err", err) + logger.Error("Websocket disconnected. Reconnecting...", "err", err) close(sig.Ready) // restart @@ -143,7 +140,7 @@ func (e *Ethereum) startMainStream( // https://github.com/ethereum/go-ethereum/issues/15063 stream, sub, history, err = etherReader.QueryWithHistory(ctx, &query) if err != nil { - logger.Error("unable to subscribe to logs", "attempt", queryAttempt, "err", err) + logger.Error("Unable to subscribe to logs", "attempt", queryAttempt, "err", err) queryAttempt++ time.Sleep(1 * time.Second) continue @@ -178,7 +175,7 @@ func (e *Ethereum) getAndConsumeHistory( toBlock = end } - logger.Debug(fmt.Sprintf("looking back in chunks of %d: chunk: %d/%d start-block: %d end-block: %d", chunkSize, chunk, totalChunksNeeded, fromBlock, toBlock)) + logger.Debug(fmt.Sprintf("Looking back in chunks of %d: chunk: %d/%d start-block: %d end-block: %d", chunkSize, chunk, totalChunksNeeded, fromBlock, toBlock)) etherReader := etherstream.Reader{Backend: e.wsClient} @@ -192,7 +189,8 @@ func (e *Ethereum) getAndConsumeHistory( for { _, toUnSub, history, err = etherReader.QueryWithHistory(ctx, &query) if err != nil { - logger.Error("unable to query history from %d to %d. attempt: %d", start, end, queryAttempt) + // TODO: add metrics for this log + logger.Error(fmt.Sprintf("Unable to query history from %d to %d. attempt: %d", start, end, queryAttempt), "err", err) queryAttempt++ time.Sleep(1 * time.Second) continue @@ -233,14 +231,14 @@ func (e *Ethereum) consumeStream( stream <-chan ethtypes.Log, sig *errSignal, ) { - logger.Debug("consuming incoming messages") + logger.Info("Starting consumption of incoming stream") var txState *types.TxState for { select { case <-ctx.Done(): return case <-sig.Ready: - logger.Debug("websocket disconnected...stopped consuming stream") + logger.Debug("Websocket disconnected... Stopped consuming stream. Will restart after websocket is re-established") return case streamLog := <-stream: parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog) @@ -272,7 +270,7 @@ func (e *Ethereum) flushMechanism( logger log.Logger, sig *errSignal, ) { - logger.Debug(fmt.Sprintf("flush mechanism started. Will flush every %v", flushInterval)) + logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval)) for { timer := time.NewTimer(flushInterval) @@ -286,18 +284,18 @@ func (e *Ethereum) flushMechanism( start := e.lastFlushedBlock - e.lookbackPeriod - logger.Info(fmt.Sprintf("flush started from %d to %d", start, latestBlock)) + logger.Info(fmt.Sprintf("Flush started from %d to %d", start, latestBlock)) e.getAndConsumeHistory(ctx, logger, start, latestBlock) e.lastFlushedBlock = latestBlock - logger.Info("flush complete") + logger.Info("Flush complete") // if main websocket stream is disconnected, stop flush. It will be restarted once websocket is reconnected case <-sig.Ready: timer.Stop() - logger.Debug("websocket disconnected. Flush stopped. Will restart after websocket is re-established") + logger.Debug("Websocket disconnected... Flush stopped. Will restart after websocket is re-established") return case <-ctx.Done(): timer.Stop() @@ -306,33 +304,31 @@ func (e *Ethereum) flushMechanism( } } -func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, loop time.Duration) { +func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) { logger.With("routine", "TrackLatestBlockHeight", "chain", e.name, "domain", e.domain) - // first time - header, err := e.rpcClient.HeaderByNumber(ctx, nil) + headers := make(chan *ethtypes.Header) + + sub, err := e.wsClient.SubscribeNewHead(ctx, headers) if err != nil { - logger.Error(fmt.Sprintf("error getting latest block height. Will retry in %.2f second:", loop.Seconds()), "err", err) - } - if err == nil { - e.SetLatestBlock(header.Number.Uint64()) + logger.Error("Failed to connect to websocket to track height. Will retry...", "err", err) + time.Sleep(1 * time.Second) + e.TrackLatestBlockHeight(ctx, logger) + return } - // then start loop on a timer + logger.Info("Height tracking websocket subscritpiton connected") + for { - timer := time.NewTimer(loop) select { - case <-timer.C: - header, err := e.rpcClient.HeaderByNumber(ctx, nil) - if err != nil { - logger.Debug(fmt.Sprintf("error getting latest block height. Will retry in %.2f second:", loop.Seconds()), "err", err) - continue - } - e.SetLatestBlock(header.Number.Uint64()) - case <-ctx.Done(): - timer.Stop() return + case err := <-sub.Err(): + logger.Error("Height tracker websocket subscritpiton error. Attempting to reconnect...", "err", err) + e.TrackLatestBlockHeight(ctx, logger) + return + case header := <-headers: + e.SetLatestBlock(header.Number.Uint64()) } } } @@ -356,7 +352,7 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m timer.Stop() balance, err := e.rpcClient.BalanceAt(ctx, account, nil) if err != nil { - logger.Error(fmt.Sprintf("error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err) + logger.Error(fmt.Sprintf("Error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err) continue } @@ -367,7 +363,7 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m case <-timer.C: balance, err := e.rpcClient.BalanceAt(ctx, account, nil) if err != nil { - logger.Error(fmt.Sprintf("error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err) + logger.Error(fmt.Sprintf("Error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err) continue } diff --git a/noble/broadcast.go b/noble/broadcast.go index 655ed4d..c3ec72f 100644 --- a/noble/broadcast.go +++ b/noble/broadcast.go @@ -99,11 +99,6 @@ func (n *Noble) attemptBroadcast( if used { msg.Status = types.Complete - // bm, _ := new(cctptypes.BurnMessage).Parse(msg.MsgBody) - // x, err := hex.DecodeString(string(bm.MintRecipient)) - // fmt.Println("err", err) - // y := common.HexToAddress(string(x)) - // fmt.Println("ERRR", err) logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used.", msg.Nonce), "src-tx", msg.SourceTxHash) continue } diff --git a/noble/listener.go b/noble/listener.go index cfe45c6..f47382d 100644 --- a/noble/listener.go +++ b/noble/listener.go @@ -94,7 +94,7 @@ func (n *Noble) StartListener( block := <-blockQueue res, err := n.cc.RPCClient.TxSearch(ctx, fmt.Sprintf("tx.height=%d", block), false, nil, nil, "") if err != nil || res == nil { - logger.Debug(fmt.Sprintf("unable to query Noble block %d. Will retry.", block), "error:", err) + logger.Debug(fmt.Sprintf("Unable to query Noble block %d. Will retry.", block), "error:", err) blockQueue <- block continue } @@ -102,7 +102,7 @@ func (n *Noble) StartListener( for _, tx := range res.Txs { parsedMsgs, err := txToMessageState(tx) if err != nil { - logger.Error("unable to parse Noble log to message state", "err", err.Error()) + logger.Error("Unable to parse Noble log to message state", "err", err.Error()) continue } for _, parsedMsg := range parsedMsgs { @@ -128,7 +128,7 @@ func (n *Noble) flushMechanism( blockQueue chan uint64, ) { - logger.Debug(fmt.Sprintf("flush mechanism started. Will flush every %v", flushInterval)) + logger.Debug(fmt.Sprintf("Flush mechanism started. Will flush every %v", flushInterval)) for { timer := time.NewTimer(flushInterval) @@ -139,11 +139,11 @@ func (n *Noble) flushMechanism( // test to see that the rpc is available before attempting flush res, err := n.cc.RPCClient.Status(ctx) if err != nil { - logger.Error(fmt.Sprintf("skipping flush... error reaching out to rpc, will retry flush in %v", flushInterval)) + logger.Error(fmt.Sprintf("Skipping flush... error reaching out to rpc, will retry flush in %v", flushInterval)) continue } if res.SyncInfo.CatchingUp { - logger.Error(fmt.Sprintf("skipping flush... rpc still catching, will retry flush in %v", flushInterval)) + logger.Error(fmt.Sprintf("Skipping flush... rpc still catching, will retry flush in %v", flushInterval)) continue } @@ -154,14 +154,14 @@ func (n *Noble) flushMechanism( flushStart := lastFlushedBlock - n.lookbackPeriod - logger.Info(fmt.Sprintf("flush started from: %d to: %d", flushStart, latestBlock)) + logger.Info(fmt.Sprintf("Flush started from: %d to: %d", flushStart, latestBlock)) for i := flushStart; i <= latestBlock; i++ { blockQueue <- i } n.lastFlushedBlock = latestBlock - logger.Info("flush complete") + logger.Info("Flush complete") case <-ctx.Done(): timer.Stop() @@ -170,24 +170,24 @@ func (n *Noble) flushMechanism( } } -func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, loop time.Duration) { +func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger) { logger.With("routine", "TrackLatestBlockHeight", "chain", n.Name(), "domain", n.Domain()) // first time res, err := n.cc.RPCClient.Status(ctx) if err != nil { - logger.Error("unable to query Nobles latest height", "err", err) + logger.Error("Unable to query Nobles latest height", "err", err) } n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight)) // then start loop on a timer for { - timer := time.NewTimer(loop) + timer := time.NewTimer(6 * time.Second) select { case <-timer.C: res, err := n.cc.RPCClient.Status(ctx) if err != nil { - logger.Error("unable to query Nobles latest height", "err", err) + logger.Error("Unable to query Nobles latest height", "err", err) continue } n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight)) diff --git a/types/chain.go b/types/chain.go index 8501221..98a3873 100644 --- a/types/chain.go +++ b/types/chain.go @@ -65,7 +65,6 @@ type Chain interface { TrackLatestBlockHeight( ctx context.Context, logger log.Logger, - loop time.Duration, ) WalletBalanceMetric(