From d73d1469a877588c119fc3fbd0eb31834d826c7a Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 29 May 2024 18:28:31 +0900 Subject: [PATCH] wip --- node/pkg/wfetcher/binance/binance.go | 2 +- node/pkg/wfetcher/binance/utils.go | 17 ----------------- node/pkg/wfetcher/coinbase/coinbase.go | 19 +++++++++++++++++-- node/pkg/wfetcher/coinbase/utils.go | 21 ++++----------------- node/pkg/wfetcher/coinone/coinone.go | 2 +- node/pkg/wfetcher/coinone/utils.go | 18 ------------------ node/pkg/wfetcher/korbit/korbit.go | 2 +- node/pkg/wfetcher/korbit/utils.go | 18 ------------------ node/pkg/wfetcher/utils.go | 15 +++++++++++++++ node/script/test_websocket/main.go | 8 ++++---- 10 files changed, 43 insertions(+), 79 deletions(-) diff --git a/node/pkg/wfetcher/binance/binance.go b/node/pkg/wfetcher/binance/binance.go index 596465e2a..1ee2085b8 100644 --- a/node/pkg/wfetcher/binance/binance.go +++ b/node/pkg/wfetcher/binance/binance.go @@ -47,7 +47,7 @@ func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*BinanceFetcher, } func (b *BinanceFetcher) handleMessage(ctx context.Context, message map[string]any) error { - ticker, err := MessageToTicker(message) + ticker, err := wfetcher.MessageToStruct[MiniTicker](message) if err != nil { log.Error().Str("Player", "Binance").Err(err).Msg("error in MessageToTicker") return err diff --git a/node/pkg/wfetcher/binance/utils.go b/node/pkg/wfetcher/binance/utils.go index 6979808f9..5d5548a8e 100644 --- a/node/pkg/wfetcher/binance/utils.go +++ b/node/pkg/wfetcher/binance/utils.go @@ -1,12 +1,10 @@ package binance import ( - "encoding/json" "fmt" "time" "bisonai.com/orakl/node/pkg/wfetcher" - "github.com/rs/zerolog/log" ) func GetSubscription(names []string) Subscription { @@ -22,21 +20,6 @@ func GetSubscription(names []string) Subscription { Id: 1, } } -func MessageToTicker(msg map[string]any) (MiniTicker, error) { - var miniTicker MiniTicker - jsonData, err := json.Marshal(msg) - if err != nil { - log.Error().Str("Player", "Binance").Err(err).Msg("error in json.Marshal") - return miniTicker, err - } - - err = json.Unmarshal(jsonData, &miniTicker) - if err != nil { - log.Error().Str("Player", "Binance").Err(err).Msg("error in json.Unmarshal") - return miniTicker, err - } - return miniTicker, nil -} func TickerToFeedData(miniTicker MiniTicker, feedMap map[string]int32) (*wfetcher.FeedData, error) { feedData := new(wfetcher.FeedData) diff --git a/node/pkg/wfetcher/coinbase/coinbase.go b/node/pkg/wfetcher/coinbase/coinbase.go index 0e8844ecc..2e8956449 100644 --- a/node/pkg/wfetcher/coinbase/coinbase.go +++ b/node/pkg/wfetcher/coinbase/coinbase.go @@ -51,16 +51,31 @@ func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*CoinbaseFetcher, } func (c *CoinbaseFetcher) handleMessage(ctx context.Context, message map[string]any) error { - ticker, err := MessageToTicker(message) + ticker, err := wfetcher.MessageToStruct[Ticker](message) if err != nil { return err } + fmt.Println(ticker) + + if ticker.Type != "ticker" { + return nil + } feedData, err := TickerToFeedData(ticker, c.FeedMap) if err != nil { return err } - fmt.Println(feedData) + fmt.Println(feedData.String()) + + // err = wfetcher.StoreFeed(ctx, &feedData, 2*time.Second) + // if err != nil { + // return err + // } + // return nil return nil } + +func (k *CoinbaseFetcher) Run(ctx context.Context) { + k.Ws.Run(ctx, k.handleMessage) +} diff --git a/node/pkg/wfetcher/coinbase/utils.go b/node/pkg/wfetcher/coinbase/utils.go index 5ea1d9181..19ec28c1a 100644 --- a/node/pkg/wfetcher/coinbase/utils.go +++ b/node/pkg/wfetcher/coinbase/utils.go @@ -1,37 +1,24 @@ package coinbase import ( - "encoding/json" "fmt" + "strings" "time" "bisonai.com/orakl/node/pkg/wfetcher" + "github.com/rs/zerolog/log" ) -func MessageToTicker(msg map[string]any) (Ticker, error) { - var ticker Ticker - jsonData, err := json.Marshal(msg) - if err != nil { - return ticker, err - } - - err = json.Unmarshal(jsonData, &ticker) - if err != nil { - return ticker, err - } - - return ticker, nil -} - func TickerToFeedData(ticker Ticker, feedMap map[string]int32) (wfetcher.FeedData, error) { feedData := wfetcher.FeedData{} timestamp, err := time.Parse(time.RFC3339Nano, ticker.Time) if err != nil { + log.Warn().Err(err).Msg("error in parsing time") timestamp = time.Now() } - id, exists := feedMap[ticker.ProductID] + id, exists := feedMap[strings.ToUpper(ticker.ProductID)] if !exists { return feedData, fmt.Errorf("feed not found") } diff --git a/node/pkg/wfetcher/coinone/coinone.go b/node/pkg/wfetcher/coinone/coinone.go index 90a033235..281f44570 100644 --- a/node/pkg/wfetcher/coinone/coinone.go +++ b/node/pkg/wfetcher/coinone/coinone.go @@ -62,7 +62,7 @@ func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*CoinoneFetcher, } func (c *CoinoneFetcher) handleMessage(ctx context.Context, message map[string]any) error { - raw, err := MessageToRawResponse(message) + raw, err := wfetcher.MessageToStruct[Raw](message) if err != nil { log.Error().Str("Player", "Coinone").Err(err).Msg("error in MessageToRawResponse") return err diff --git a/node/pkg/wfetcher/coinone/utils.go b/node/pkg/wfetcher/coinone/utils.go index 91bb4a69b..3d8d011ce 100644 --- a/node/pkg/wfetcher/coinone/utils.go +++ b/node/pkg/wfetcher/coinone/utils.go @@ -1,31 +1,13 @@ package coinone import ( - "encoding/json" "fmt" "strings" "time" "bisonai.com/orakl/node/pkg/wfetcher" - "github.com/rs/zerolog/log" ) -func MessageToRawResponse(msg map[string]any) (Raw, error) { - var rawResponse Raw - jsonData, err := json.Marshal(msg) - if err != nil { - log.Error().Str("Player", "Coinone").Err(err).Msg("error in json.Marshal") - return rawResponse, err - } - - err = json.Unmarshal(jsonData, &rawResponse) - if err != nil { - log.Error().Str("Player", "Coinone").Err(err).Msg("error in json.Unmarshal") - return rawResponse, err - } - return rawResponse, nil -} - func DataToFeedData(data Data, feedMap map[string]int32) (wfetcher.FeedData, error) { feedData := wfetcher.FeedData{} diff --git a/node/pkg/wfetcher/korbit/korbit.go b/node/pkg/wfetcher/korbit/korbit.go index 2eaae52a3..1dc4bee1b 100644 --- a/node/pkg/wfetcher/korbit/korbit.go +++ b/node/pkg/wfetcher/korbit/korbit.go @@ -63,7 +63,7 @@ func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*KorbitFetcher, e } func (k *KorbitFetcher) handleMessage(ctx context.Context, message map[string]any) error { - raw, err := MessageToRawResponse(message) + raw, err := wfetcher.MessageToStruct[Raw](message) if err != nil { log.Error().Str("Player", "Korbit").Err(err).Msg("error in MessageToRawResponse") return err diff --git a/node/pkg/wfetcher/korbit/utils.go b/node/pkg/wfetcher/korbit/utils.go index 7a2638d0d..d7165d9d8 100644 --- a/node/pkg/wfetcher/korbit/utils.go +++ b/node/pkg/wfetcher/korbit/utils.go @@ -1,31 +1,13 @@ package korbit import ( - "encoding/json" "fmt" "strings" "time" "bisonai.com/orakl/node/pkg/wfetcher" - "github.com/rs/zerolog/log" ) -func MessageToRawResponse(msg map[string]any) (Raw, error) { - var rawResponse Raw - jsonData, err := json.Marshal(msg) - if err != nil { - log.Error().Str("Player", "Korbit").Err(err).Msg("error in json.Marshal") - return rawResponse, err - } - - err = json.Unmarshal(jsonData, &rawResponse) - if err != nil { - log.Error().Str("Player", "Korbit").Err(err).Msg("error in json.Unmarshal") - return rawResponse, err - } - return rawResponse, nil -} - func DataToFeedData(data Ticker, feedMap map[string]int32) (wfetcher.FeedData, error) { feedData := wfetcher.FeedData{} diff --git a/node/pkg/wfetcher/utils.go b/node/pkg/wfetcher/utils.go index 924c051f1..46d90357f 100644 --- a/node/pkg/wfetcher/utils.go +++ b/node/pkg/wfetcher/utils.go @@ -76,3 +76,18 @@ func PriceStringToFloat64(price string) (float64, error) { return f * float64(math.Pow10(int(DECIMALS))), nil } + +func MessageToStruct[T any](message map[string]any) (T, error) { + var result T + data, err := json.Marshal(message) + if err != nil { + return result, err + } + + err = json.Unmarshal(data, &result) + if err != nil { + return result, err + } + + return result, nil +} diff --git a/node/script/test_websocket/main.go b/node/script/test_websocket/main.go index 11ed49c46..024e06698 100644 --- a/node/script/test_websocket/main.go +++ b/node/script/test_websocket/main.go @@ -5,18 +5,18 @@ import ( "sync" "bisonai.com/orakl/node/pkg/wfetcher" - "bisonai.com/orakl/node/pkg/wfetcher/korbit" + "bisonai.com/orakl/node/pkg/wfetcher/coinbase" ) func main() { ctx := context.Background() var wg sync.WaitGroup feedMap := make(map[string]int32) - feedMap["BTC-KRW"] = 1 - feedMap["ETH-KRW"] = 2 + feedMap["ADA-USDT"] = 1 + feedMap["ETH-USDT"] = 2 wg.Add(1) - c, err := korbit.New(ctx, wfetcher.WithFeedMap(feedMap)) + c, err := coinbase.New(ctx, wfetcher.WithFeedMap(feedMap)) if err != nil { panic(err) }