diff --git a/node/pkg/wfetcher/coinbase/coinbase.go b/node/pkg/wfetcher/coinbase/coinbase.go new file mode 100644 index 000000000..0e8844ecc --- /dev/null +++ b/node/pkg/wfetcher/coinbase/coinbase.go @@ -0,0 +1,66 @@ +package coinbase + +import ( + "context" + "fmt" + + "bisonai.com/orakl/node/pkg/wfetcher" + "bisonai.com/orakl/node/pkg/wss" + "github.com/rs/zerolog/log" +) + +type CoinbaseFetcher wfetcher.Fetcher + +// expected to recieve feedmap with key having format "-" +func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*CoinbaseFetcher, error) { + config := &wfetcher.FetcherConfig{} + for _, opt := range opts { + opt(config) + } + + fetcher := &CoinbaseFetcher{} + + if len(config.FeedMap) == 0 { + log.Error().Str("Player", "Coinbase").Msg("no feed map") + return nil, fmt.Errorf("no feed map") + } + fetcher.FeedMap = config.FeedMap + + pairListString := []string{} + for feed := range config.FeedMap { + pairListString = append(pairListString, feed) + } + + subscription := Subscription{ + Type: "subscribe", + ProductIds: pairListString, + Channels: []string{"ticker"}, + } + + ws, err := wss.NewWebsocketHelper(ctx, + wss.WithEndpoint(URL), + wss.WithSubscriptions([]any{subscription}), + wss.WithProxyUrl(config.Proxy)) + if err != nil { + log.Error().Str("Player", "Coinbase").Err(err).Msg("error in NewWebsocketHelper") + return nil, err + } + + fetcher.Ws = ws + return fetcher, nil +} + +func (c *CoinbaseFetcher) handleMessage(ctx context.Context, message map[string]any) error { + ticker, err := MessageToTicker(message) + if err != nil { + return err + } + + feedData, err := TickerToFeedData(ticker, c.FeedMap) + if err != nil { + return err + } + + fmt.Println(feedData) + return nil +} diff --git a/node/pkg/wfetcher/coinbase/type.go b/node/pkg/wfetcher/coinbase/type.go index b6653f1c1..451e8a59c 100644 --- a/node/pkg/wfetcher/coinbase/type.go +++ b/node/pkg/wfetcher/coinbase/type.go @@ -5,6 +5,19 @@ const ( ) /* +// Request +{ + "type": "subscribe", + "product_ids": [ + "ETH-USD", + "BTC-USD" + ], + "channels": ["ticker"] +} +*/ + +/* +// response { "type": "ticker", "sequence": 37475248783, diff --git a/node/pkg/wfetcher/coinbase/utils.go b/node/pkg/wfetcher/coinbase/utils.go new file mode 100644 index 000000000..5ea1d9181 --- /dev/null +++ b/node/pkg/wfetcher/coinbase/utils.go @@ -0,0 +1,50 @@ +package coinbase + +import ( + "encoding/json" + "fmt" + "time" + + "bisonai.com/orakl/node/pkg/wfetcher" +) + +func MessageToTicker(msg map[string]any) (Ticker, error) { + var ticker Ticker + jsonData, err := json.Marshal(msg) + if err != nil { + return ticker, err + } + + err = json.Unmarshal(jsonData, &ticker) + if err != nil { + return ticker, err + } + + return ticker, nil +} + +func TickerToFeedData(ticker Ticker, feedMap map[string]int32) (wfetcher.FeedData, error) { + feedData := wfetcher.FeedData{} + + timestamp, err := time.Parse(time.RFC3339Nano, ticker.Time) + if err != nil { + timestamp = time.Now() + } + + id, exists := feedMap[ticker.ProductID] + if !exists { + return feedData, fmt.Errorf("feed not found") + } + + value, err := wfetcher.PriceStringToFloat64(ticker.Price) + if err != nil { + return feedData, err + } + + feedData.FeedId = id + feedData.Value = value + feedData.Timestamp = timestamp + + return feedData, nil + +}