Skip to content

Commit

Permalink
appstate
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Feb 27, 2024
1 parent e81231e commit f3d1443
Show file tree
Hide file tree
Showing 23 changed files with 380 additions and 1,009 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea
noble-cctp-relayer
.ignore
.ignore
.env
93 changes: 93 additions & 0 deletions cmd/appstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package cmd

import (
"fmt"
"os"

"cosmossdk.io/log"
"github.com/rs/zerolog"
"github.com/strangelove-ventures/noble-cctp-relayer/ethereum"
"github.com/strangelove-ventures/noble-cctp-relayer/noble"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
"gopkg.in/yaml.v2"
)

// appState is the modifiable state of the application.
type AppState struct {
Config *types.Config

ConfigPath string

Debug bool

Logger log.Logger
}

func NewappState() *AppState {
return &AppState{}
}

func (a *AppState) InitLogger() {
if a.Debug {
a.Logger = log.NewLogger(os.Stdout)
} else {
a.Logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.InfoLevel))
}
}

// loadConfigFile reads config file into a.Config if file is present.
func (a *AppState) loadConfigFile() error {
config, err := ParseConfig(a.ConfigPath)
if err != nil {
a.Logger.Error("unable to parse config file", "location", a.ConfigPath, "err", err)
os.Exit(1)
}
a.Logger.Info("successfully parsed config file", "location", a.ConfigPath)
a.Config = config

return nil
}

// ParseConfig parses the app config file
func ParseConfig(file string) (*types.Config, error) {
data, err := os.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("failed to read file %w", err)
}

var cfg types.ConfigWrapper
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("error unmarshalling config: %w", err)
}

c := types.Config{
EnabledRoutes: cfg.EnabledRoutes,
Circle: cfg.Circle,
ProcessorWorkerCount: cfg.ProcessorWorkerCount,
Api: cfg.Api,
Chains: make(map[string]types.ChainConfig),
}

for name, chain := range cfg.Chains {
yamlbz, err := yaml.Marshal(chain)
if err != nil {
return nil, err
}

switch name {
case "noble":
var cc noble.ChainConfig
if err := yaml.Unmarshal(yamlbz, &cc); err != nil {
return nil, err
}
c.Chains[name] = &cc
default:
var cc ethereum.ChainConfig
if err := yaml.Unmarshal(yamlbz, &cc); err != nil {
return nil, err
}
c.Chains[name] = &cc
}
}
return &c, err
}
4 changes: 2 additions & 2 deletions cmd/root_test.go → cmd/appstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestConfig(t *testing.T) {
file, err := cmd.Parse("../config/sample-config.yaml")
file, err := cmd.ParseConfig("../config/sample-config.yaml")
require.NoError(t, err, "Error parsing config")

// assert noble chainConfig correctly parsed
Expand All @@ -25,7 +25,7 @@ func TestConfig(t *testing.T) {
}

func TestBlockQueueChannelSize(t *testing.T) {
file, err := cmd.Parse("../config/sample-config.yaml")
file, err := cmd.ParseConfig("../config/sample-config.yaml")
require.NoError(t, err, "Error parsing config")

var nobleCfg any = file.Chains["noble"]
Expand Down
128 changes: 43 additions & 85 deletions cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@ import (

"cosmossdk.io/log"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
"github.com/strangelove-ventures/noble-cctp-relayer/circle"
"github.com/strangelove-ventures/noble-cctp-relayer/ethereum"
"github.com/strangelove-ventures/noble-cctp-relayer/noble"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
"gopkg.in/yaml.v2"
)

// State and Store map the iris api lookup id -> MessageState
Expand All @@ -27,65 +23,72 @@ var State = types.NewStateMap()
// SequenceMap maps the domain -> the equivalent minter account sequence or nonce
var sequenceMap = types.NewSequenceMap()

func Start() *cobra.Command {
func Start(a *AppState) *cobra.Command {
cmd := &cobra.Command{
Use: "start",
Short: "Start relaying CCTP transactions",
Run: func(cmd *cobra.Command, args []string) {
logger := a.Logger
cfg := a.Config

startApi(a)

// messageState processing queue
var processingQueue = make(chan *types.TxState, 10000)

registeredDomains := make(map[types.Domain]types.Chain)

for name, cfg := range Cfg.Chains {
for name, cfg := range cfg.Chains {
c, err := cfg.Chain(name)
if err != nil {
Logger.Error("Error creating chain", "err: ", err)
logger.Error("Error creating chain", "err: ", err)
os.Exit(1)
}

if err := c.InitializeBroadcaster(cmd.Context(), Logger, sequenceMap); err != nil {
Logger.Error("Error initializing broadcaster", "err: ", err)
if err := c.InitializeBroadcaster(cmd.Context(), logger, sequenceMap); err != nil {
logger.Error("Error initializing broadcaster", "err: ", err)
os.Exit(1)
}

go c.StartListener(cmd.Context(), Logger, processingQueue)
go c.StartListener(cmd.Context(), logger, processingQueue)

if _, ok := registeredDomains[c.Domain()]; ok {
Logger.Error("Duplicate domain found", "domain", c.Domain(), "name:", c.Name())
logger.Error("Duplicate domain found", "domain", c.Domain(), "name:", c.Name())
os.Exit(1)
}

registeredDomains[c.Domain()] = c
}

// spin up Processor worker pool
for i := 0; i < int(Cfg.ProcessorWorkerCount); i++ {
go StartProcessor(cmd.Context(), Cfg, Logger, registeredDomains, processingQueue, sequenceMap)
for i := 0; i < int(cfg.ProcessorWorkerCount); i++ {
go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap)
}

<-cmd.Context().Done()
},
}
cmd.Flags().StringVarP(&cfgFile, "config", "c", "config.yaml", "file path of config file")
cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "use this flag to set log level to `debug`")
// cmd.Flags().StringVarP(&cfgFile, "config", "c", "config.yaml", "file path of config file")
// cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "use this flag to set log level to `debug`")
return cmd
}

// StartProcessor is the main processing pipeline.
func StartProcessor(
ctx context.Context,
cfg *types.Config,
logger log.Logger,
a *AppState,
registeredDomains map[types.Domain]types.Chain,
processingQueue chan *types.TxState,
sequenceMap *types.SequenceMap,
) {
logger := a.Logger
cfg := a.Config

for {
dequeuedTx := <-processingQueue

// if this is the first time seeing this message, add it to the State
State.Mu.Lock()
tx, ok := State.Load(LookupKey(dequeuedTx.TxHash))
if !ok {
State.Store(LookupKey(dequeuedTx.TxHash), dequeuedTx)
Expand All @@ -94,6 +97,7 @@ func StartProcessor(
msg.Status = types.Created
}
}
State.Mu.Unlock()

var broadcastMsgs = make(map[types.Domain][]*types.MessageState)
var requeue bool
Expand All @@ -102,7 +106,9 @@ func StartProcessor(
// if a filter's condition is met, mark as filtered
if FilterDisabledCCTPRoutes(cfg, logger, msg) ||
filterInvalidDestinationCallers(registeredDomains, logger, msg) {
State.Mu.Lock()
msg.Status = types.Filtered
State.Mu.Unlock()
}

// if the message is burned or pending, check for an attestation
Expand All @@ -111,8 +117,10 @@ func StartProcessor(
if response != nil {
if msg.Status == types.Created && response.Status == "pending_confirmations" {
logger.Debug("Attestation is created but still pending confirmations for 0x" + msg.IrisLookupId + ". Retrying...")
State.Mu.Lock()
msg.Status = types.Pending
msg.Updated = time.Now()
State.Mu.Unlock()
time.Sleep(10 * time.Second)
requeue = true
continue
Expand All @@ -123,10 +131,12 @@ func StartProcessor(
continue
} else if response.Status == "complete" {
logger.Debug("Attestation is complete for 0x" + msg.IrisLookupId + ". Retrying...")
State.Mu.Lock()
msg.Status = types.Attested
msg.Attestation = response.Attestation
msg.Updated = time.Now()
broadcastMsgs[msg.DestDomain] = append(broadcastMsgs[msg.DestDomain], msg)
State.Mu.Unlock()
}
} else {
// add attestation retry intervals per domain here
Expand All @@ -152,10 +162,12 @@ func StartProcessor(
continue
}

State.Mu.Lock()
for _, msg := range msgs {
msg.Status = types.Complete
msg.Updated = time.Now()
}
State.Mu.Unlock()

}
if requeue {
Expand Down Expand Up @@ -190,43 +202,32 @@ func filterInvalidDestinationCallers(registeredDomains map[types.Domain]types.Ch
logger.Error("No chain registered for domain", "domain", msg.DestDomain)
return true
}
validCaller := chain.IsDestinationCaller(msg.DestinationCaller)

if validCaller {
// we do not want to filter this message if valid caller
return false
}

return !chain.IsDestinationCaller(msg.DestinationCaller)
logger.Info(fmt.Sprintf("Filtered tx %s from %d to %d due to destination caller: %s)",
msg.SourceTxHash, msg.SourceDomain, msg.DestDomain, msg.DestinationCaller))
return true
}

func LookupKey(sourceTxHash string) string {
// return fmt.Sprintf("%s-%s", sourceTxHash, messageType)
return sourceTxHash
}

func init() {
cobra.OnInitialize(func() {
if verbose {
Logger = log.NewLogger(os.Stdout)
} else {
Logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.InfoLevel))
}

var err error
Cfg, err = Parse(cfgFile)
if err != nil {
Logger.Error("unable to parse config file", "location", cfgFile, "err", err)
os.Exit(1)
}
Logger.Info("successfully parsed config file", "location", cfgFile)

// start api server
go startApi()
})
}

func startApi() {
func startApi(a *AppState) {
logger := a.Logger
cfg := a.Config
gin.SetMode(gin.ReleaseMode)
router := gin.Default()

err := router.SetTrustedProxies(Cfg.Api.TrustedProxies) // vpn.primary.strange.love
err := router.SetTrustedProxies(cfg.Api.TrustedProxies) // vpn.primary.strange.love
if err != nil {
Logger.Error("unable to set trusted proxies on API server: " + err.Error())
logger.Error("unable to set trusted proxies on API server: " + err.Error())
os.Exit(1)
}

Expand All @@ -250,46 +251,3 @@ func getTxByHash(c *gin.Context) {

c.JSON(http.StatusNotFound, gin.H{"message": "message not found"})
}

func Parse(file string) (*types.Config, error) {
data, err := os.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("failed to read file %w", err)
}

var cfg types.ConfigWrapper
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("error unmarshalling config: %w", err)
}

c := types.Config{
EnabledRoutes: cfg.EnabledRoutes,
Circle: cfg.Circle,
ProcessorWorkerCount: cfg.ProcessorWorkerCount,
Api: cfg.Api,
Chains: make(map[string]types.ChainConfig),
}

for name, chain := range cfg.Chains {
yamlbz, err := yaml.Marshal(chain)
if err != nil {
return nil, err
}

switch name {
case "noble":
var cc noble.ChainConfig
if err := yaml.Unmarshal(yamlbz, &cc); err != nil {
return nil, err
}
c.Chains[name] = &cc
default:
var cc ethereum.ChainConfig
if err := yaml.Unmarshal(yamlbz, &cc); err != nil {
return nil, err
}
c.Chains[name] = &cc
}
}
return &c, err
}
Loading

0 comments on commit f3d1443

Please sign in to comment.