Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed May 29, 2024
1 parent be3eebf commit d73d146
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 79 deletions.
2 changes: 1 addition & 1 deletion node/pkg/wfetcher/binance/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*BinanceFetcher,
}

func (b *BinanceFetcher) handleMessage(ctx context.Context, message map[string]any) error {
ticker, err := MessageToTicker(message)
ticker, err := wfetcher.MessageToStruct[MiniTicker](message)
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in MessageToTicker")
return err
Expand Down
17 changes: 0 additions & 17 deletions node/pkg/wfetcher/binance/utils.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package binance

import (
"encoding/json"
"fmt"
"time"

"bisonai.com/orakl/node/pkg/wfetcher"
"github.com/rs/zerolog/log"
)

func GetSubscription(names []string) Subscription {
Expand All @@ -22,21 +20,6 @@ func GetSubscription(names []string) Subscription {
Id: 1,
}
}
func MessageToTicker(msg map[string]any) (MiniTicker, error) {
var miniTicker MiniTicker
jsonData, err := json.Marshal(msg)
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in json.Marshal")
return miniTicker, err
}

err = json.Unmarshal(jsonData, &miniTicker)
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in json.Unmarshal")
return miniTicker, err
}
return miniTicker, nil
}

func TickerToFeedData(miniTicker MiniTicker, feedMap map[string]int32) (*wfetcher.FeedData, error) {
feedData := new(wfetcher.FeedData)
Expand Down
19 changes: 17 additions & 2 deletions node/pkg/wfetcher/coinbase/coinbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,31 @@ func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*CoinbaseFetcher,
}

func (c *CoinbaseFetcher) handleMessage(ctx context.Context, message map[string]any) error {
ticker, err := MessageToTicker(message)
ticker, err := wfetcher.MessageToStruct[Ticker](message)
if err != nil {
return err
}
fmt.Println(ticker)

if ticker.Type != "ticker" {
return nil
}

feedData, err := TickerToFeedData(ticker, c.FeedMap)
if err != nil {
return err
}

fmt.Println(feedData)
fmt.Println(feedData.String())

// err = wfetcher.StoreFeed(ctx, &feedData, 2*time.Second)
// if err != nil {
// return err
// }
// return nil
return nil
}

func (k *CoinbaseFetcher) Run(ctx context.Context) {
k.Ws.Run(ctx, k.handleMessage)
}
21 changes: 4 additions & 17 deletions node/pkg/wfetcher/coinbase/utils.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,24 @@
package coinbase

import (
"encoding/json"
"fmt"
"strings"
"time"

"bisonai.com/orakl/node/pkg/wfetcher"
"github.com/rs/zerolog/log"
)

func MessageToTicker(msg map[string]any) (Ticker, error) {
var ticker Ticker
jsonData, err := json.Marshal(msg)
if err != nil {
return ticker, err
}

err = json.Unmarshal(jsonData, &ticker)
if err != nil {
return ticker, err
}

return ticker, nil
}

func TickerToFeedData(ticker Ticker, feedMap map[string]int32) (wfetcher.FeedData, error) {
feedData := wfetcher.FeedData{}

timestamp, err := time.Parse(time.RFC3339Nano, ticker.Time)
if err != nil {
log.Warn().Err(err).Msg("error in parsing time")
timestamp = time.Now()
}

id, exists := feedMap[ticker.ProductID]
id, exists := feedMap[strings.ToUpper(ticker.ProductID)]
if !exists {
return feedData, fmt.Errorf("feed not found")
}
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/wfetcher/coinone/coinone.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*CoinoneFetcher,
}

func (c *CoinoneFetcher) handleMessage(ctx context.Context, message map[string]any) error {
raw, err := MessageToRawResponse(message)
raw, err := wfetcher.MessageToStruct[Raw](message)
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in MessageToRawResponse")
return err
Expand Down
18 changes: 0 additions & 18 deletions node/pkg/wfetcher/coinone/utils.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,13 @@
package coinone

import (
"encoding/json"
"fmt"
"strings"
"time"

"bisonai.com/orakl/node/pkg/wfetcher"
"github.com/rs/zerolog/log"
)

func MessageToRawResponse(msg map[string]any) (Raw, error) {
var rawResponse Raw
jsonData, err := json.Marshal(msg)
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in json.Marshal")
return rawResponse, err
}

err = json.Unmarshal(jsonData, &rawResponse)
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in json.Unmarshal")
return rawResponse, err
}
return rawResponse, nil
}

func DataToFeedData(data Data, feedMap map[string]int32) (wfetcher.FeedData, error) {
feedData := wfetcher.FeedData{}

Expand Down
2 changes: 1 addition & 1 deletion node/pkg/wfetcher/korbit/korbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*KorbitFetcher, e
}

func (k *KorbitFetcher) handleMessage(ctx context.Context, message map[string]any) error {
raw, err := MessageToRawResponse(message)
raw, err := wfetcher.MessageToStruct[Raw](message)
if err != nil {
log.Error().Str("Player", "Korbit").Err(err).Msg("error in MessageToRawResponse")
return err
Expand Down
18 changes: 0 additions & 18 deletions node/pkg/wfetcher/korbit/utils.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,13 @@
package korbit

import (
"encoding/json"
"fmt"
"strings"
"time"

"bisonai.com/orakl/node/pkg/wfetcher"
"github.com/rs/zerolog/log"
)

func MessageToRawResponse(msg map[string]any) (Raw, error) {
var rawResponse Raw
jsonData, err := json.Marshal(msg)
if err != nil {
log.Error().Str("Player", "Korbit").Err(err).Msg("error in json.Marshal")
return rawResponse, err
}

err = json.Unmarshal(jsonData, &rawResponse)
if err != nil {
log.Error().Str("Player", "Korbit").Err(err).Msg("error in json.Unmarshal")
return rawResponse, err
}
return rawResponse, nil
}

func DataToFeedData(data Ticker, feedMap map[string]int32) (wfetcher.FeedData, error) {
feedData := wfetcher.FeedData{}

Expand Down
15 changes: 15 additions & 0 deletions node/pkg/wfetcher/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,18 @@ func PriceStringToFloat64(price string) (float64, error) {

return f * float64(math.Pow10(int(DECIMALS))), nil
}

func MessageToStruct[T any](message map[string]any) (T, error) {
var result T
data, err := json.Marshal(message)
if err != nil {
return result, err
}

err = json.Unmarshal(data, &result)
if err != nil {
return result, err
}

return result, nil
}
8 changes: 4 additions & 4 deletions node/script/test_websocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
"sync"

"bisonai.com/orakl/node/pkg/wfetcher"
"bisonai.com/orakl/node/pkg/wfetcher/korbit"
"bisonai.com/orakl/node/pkg/wfetcher/coinbase"
)

func main() {
ctx := context.Background()
var wg sync.WaitGroup
feedMap := make(map[string]int32)
feedMap["BTC-KRW"] = 1
feedMap["ETH-KRW"] = 2
feedMap["ADA-USDT"] = 1
feedMap["ETH-USDT"] = 2

wg.Add(1)
c, err := korbit.New(ctx, wfetcher.WithFeedMap(feedMap))
c, err := coinbase.New(ctx, wfetcher.WithFeedMap(feedMap))
if err != nil {
panic(err)
}
Expand Down

0 comments on commit d73d146

Please sign in to comment.