Skip to content

Commit

Permalink
Add codec + fix context
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Jan 23, 2024
1 parent d4d198a commit 59e67f4
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 34 deletions.
10 changes: 3 additions & 7 deletions cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"cosmossdk.io/log"
Expand All @@ -29,11 +27,10 @@ var State = types.NewStateMap()
var sequenceMap = types.NewSequenceMap()

func Start(cmd *cobra.Command, args []string) {

// messageState processing queue
var processingQueue = make(chan *types.TxState, 10000)

sigTerm := make(chan os.Signal, 1)

registeredDomains := make(map[types.Domain]types.Chain)

for name, cfg := range Cfg.Chains {
Expand All @@ -48,7 +45,7 @@ func Start(cmd *cobra.Command, args []string) {
os.Exit(1)
}

go c.StartListener(cmd.Context(), Logger, processingQueue, sigTerm)
go c.StartListener(cmd.Context(), Logger, processingQueue)

if _, ok := registeredDomains[c.Domain()]; ok {
Logger.Error("Duplicate domain found", "domain", c.Domain())
Expand All @@ -63,8 +60,7 @@ func Start(cmd *cobra.Command, args []string) {
go StartProcessor(cmd.Context(), Cfg, Logger, registeredDomains, processingQueue, sequenceMap)
}

signal.Notify(sigTerm, os.Interrupt, syscall.SIGTERM)
<-sigTerm
<-cmd.Context().Done()
}

// StartProcessor is the main processing pipeline.
Expand Down
5 changes: 3 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -30,8 +31,8 @@ var rootCmd = &cobra.Command{
Short: "A CLI tool for relaying CCTP messages",
}

func Execute() {
if err := rootCmd.Execute(); err != nil {
func Execute(ctx context.Context) {
if err := rootCmd.ExecuteContext(ctx); err != nil {
Logger.Error(err.Error())
os.Exit(1)
}
Expand Down
5 changes: 2 additions & 3 deletions cosmos/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/std"
"github.com/cosmos/cosmos-sdk/types/module"

//"github.com/cosmos/cosmos-sdk/x/auth"
"github.com/cosmos/cosmos-sdk/x/auth"
"github.com/cosmos/cosmos-sdk/x/auth/tx"
// authz "github.com/cosmos/cosmos-sdk/x/authz/module"
//"github.com/cosmos/cosmos-sdk/x/bank"
)

var ModuleBasics = []module.AppModuleBasic{
// auth.AppModuleBasic{},
auth.AppModuleBasic{},
// authz.AppModuleBasic{},
// bank.AppModuleBasic{},
cctp.AppModuleBasic{},
Expand Down
13 changes: 7 additions & 6 deletions ethereum/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func (e *Ethereum) StartListener(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
quit chan os.Signal,
) {
logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain)

Expand All @@ -138,19 +137,19 @@ func (e *Ethereum) StartListener(

messageSent := messageTransmitterABI.Events["MessageSent"]

ethClient, err := ethclient.DialContext(context.Background(), e.wsURL)
ethClient, err := ethclient.DialContext(ctx, e.wsURL)
if err != nil {
logger.Error("unable to initialize ethereum client", "err", err)
os.Exit(1)
}

defer ethClient.Close()
// defer ethClient.Close()

messageTransmitterAddress := common.HexToAddress(e.messageTransmitterAddress)
etherReader := etherstream.Reader{Backend: ethClient}

if e.startBlock == 0 {
header, err := ethClient.HeaderByNumber(context.Background(), nil)
header, err := ethClient.HeaderByNumber(ctx, nil)
if err != nil {
logger.Error("unable to retrieve latest eth block header", "err", err)
os.Exit(1)
Expand All @@ -172,7 +171,7 @@ func (e *Ethereum) StartListener(

// websockets do not query history
// https://github.com/ethereum/go-ethereum/issues/15063
stream, sub, history, err := etherReader.QueryWithHistory(context.Background(), &query)
stream, sub, history, err := etherReader.QueryWithHistory(ctx, &query)
if err != nil {
logger.Error("unable to subscribe to logs", "err", err)
os.Exit(1)
Expand All @@ -199,10 +198,12 @@ func (e *Ethereum) StartListener(
var txState *types.TxState
for {
select {
case <-quit:
case <-ctx.Done():
ethClient.Close()
return
case err := <-sub.Err():
logger.Error("connection closed", "err", err)
ethClient.Close()
os.Exit(1)
case streamLog := <-stream:
parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog)
Expand Down
5 changes: 4 additions & 1 deletion ethereum/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ func TestStartListener(t *testing.T) {
eth, err := ethCfg.Chain("ethereum")
require.NoError(t, err)

go eth.StartListener(context.TODO(), logger, processingQueue, nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go eth.StartListener(ctx, logger, processingQueue)

time.Sleep(5 * time.Second)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/gin-gonic/gin v1.8.1
github.com/pascaldekloe/etherstream v0.1.0
google.golang.org/grpc v1.57.0
gopkg.in/yaml.v2 v2.4.0
)

require (
Expand Down Expand Up @@ -295,7 +296,6 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
honnef.co/go/tools v0.4.5 // indirect
mvdan.cc/interfacer v0.0.0-20180901003855-c20040233aed // indirect
mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b // indirect
Expand Down
12 changes: 10 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
package main

import "github.com/strangelove-ventures/noble-cctp-relayer/cmd"
import (
"context"
"os"
"os/signal"

"github.com/strangelove-ventures/noble-cctp-relayer/cmd"
)

func main() {
cmd.Execute()
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
cmd.Execute(ctx)
}
28 changes: 20 additions & 8 deletions noble/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"math/rand"
"os"
"regexp"
"strconv"
"sync"
Expand Down Expand Up @@ -166,7 +165,6 @@ func (n *Noble) StartListener(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
quit chan os.Signal,
) {
logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain())

Expand Down Expand Up @@ -205,11 +203,13 @@ func (n *Noble) StartListener(

// listen for new blocks
go func() {
first := make(chan struct{}, 1)
first <- struct{}{}
for {
timer := time.NewTimer(6 * time.Second)
select {
case <-quit:
return
default:
case <-first:
timer.Stop()
chainTip, err = n.chainTip(ctx)
if err == nil {
if chainTip >= currentBlock {
Expand All @@ -219,7 +219,19 @@ func (n *Noble) StartListener(
currentBlock = chainTip + 1
}
}
time.Sleep(6 * time.Second)
case <-timer.C:
chainTip, err = n.chainTip(ctx)
if err == nil {
if chainTip >= currentBlock {
for i := currentBlock; i <= chainTip; i++ {
blockQueue <- i
}
currentBlock = chainTip + 1
}
}
case <-ctx.Done():
timer.Stop()
return
}
}
}()
Expand All @@ -229,7 +241,7 @@ func (n *Noble) StartListener(
go func() {
for {
select {
case <-quit:
case <-ctx.Done():
return
default:
block := <-blockQueue
Expand All @@ -255,7 +267,7 @@ func (n *Noble) StartListener(
}()
}

<-quit
<-ctx.Done()
}

func (n *Noble) chainTip(ctx context.Context) (uint64, error) {
Expand Down
7 changes: 5 additions & 2 deletions noble/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var processingQueue chan *types.TxState

func init() {
var err error
cfg, err = cmd.Parse("../../.ignore/unit_tests.yaml")
cfg, err = cmd.Parse("../.ignore/testnet.yaml")
if err != nil {
panic(err)
}
Expand All @@ -35,7 +35,10 @@ func TestStartListener(t *testing.T) {
n, err := cfg.Chains["noble"].(*noble.ChainConfig).Chain("noble")
require.NoError(t, err)

go n.StartListener(context.TODO(), logger, processingQueue, nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go n.StartListener(ctx, logger, processingQueue)

time.Sleep(20 * time.Second)

Expand Down
2 changes: 0 additions & 2 deletions types/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package types

import (
"context"
"os"

"cosmossdk.io/log"
)
Expand Down Expand Up @@ -30,7 +29,6 @@ type Chain interface {
ctx context.Context,
logger log.Logger,
processingQueue chan *TxState,
quit chan os.Signal,
)

// Broadcast broadcasts CCTP mint messages to the chain.
Expand Down

0 comments on commit 59e67f4

Please sign in to comment.